Source code for granary.nostr

"""Nostr.

* https://nostr.com/
* https://github.com/nostr-protocol/nostr
* https://github.com/nostr-protocol/nips

NIPS implemented:

* 01: base protocol, events, profile metadata
* 02: contacts/followings
* 05: domain identifiers
* 09: deletes
* 10: replies, mentions
* 12: hashtags, locations
* 14: subject tag in notes
* 18: reposts, including 10 for e/p tags
* 19: bech32-encoded ids
* 21: nostr: URI scheme
* 23: articles
* 24: extra fields
* 25: likes, emoji reactions
* 27: text notes
* 39: external identities
* 48: proxy tags
* 50: search
* 92/94: image, video, audio attachments

TODO:

* 11: relay info (like nodeinfo)
* 12: tag queries
* 16, 33: ephemeral/replaceable events
* 17: DMs
* 27: user mentions, note/event mentions
*     the difficulty is that the Nostr tags don't include human-readable
*     text. clients are supposed to get that from their local database.
* 32: tag activities
* 46: "Nostr Connect," signing proxy that holds user's keys
* 65: user relays. what would this be in AS1? anything?
* 73: external content ids
"""
from datetime import datetime, timezone
from hashlib import sha256
import itertools
import logging
import mimetypes
import re
import secrets

import bech32
from oauth_dropins.webutil import util
from oauth_dropins.webutil.util import HTTP_TIMEOUT, json_dumps, json_loads
import secp256k1
from websockets.exceptions import ConnectionClosedOK
from websockets.sync.client import connect

from . import as1
from .source import creation_result, FRIENDS, html_to_text, INCLUDE_LINK, OMIT_LINK, Source

logger = logging.getLogger(__name__)

# NIP-19
BECH32_PREFIXES = (
  'naddr',
  'nevent',
  'note',
  'nprofile',
  'npub',
  'nrelay',
  'nsec',
)
BECH32_RE = re.compile(f'^({"|".join(BECH32_PREFIXES)})[a-z0-9]{{50,70}}$')

# Event kinds
# https://github.com/nostr-protocol/nips#event-kinds
KIND_PROFILE = 0          # NIP-01: user profile metadata
KIND_NOTE = 1             # NIP-01: text note
KIND_CONTACTS = 3         # NIP-02: contact list / followings
KIND_DELETE = 5           # NIP-09: event deletion
KIND_REPOST = 6           # NIP-18: repost
KIND_REACTION = 7         # NIP-25: reactions (likes, dislikes, emojis)
KIND_GENERIC_REPOST = 16  # NIP-18: generic repost
KIND_AUTH = 22242         # NIP-42: client authentication
KIND_ARTICLE = 30023      # NIP-23: long-form content
KIND_RELAYS = 10002       # NIP-65: user relays

# NIP-39
# https://github.com/nostr-protocol/nips/blob/master/39.md#claim-types
# maps NIP-39 platform to base URL
PLATFORMS = {
  'github': 'https://github.com/',
  'telegram': 'https://t.me/',
  'twitter': 'https://twitter.com/',
  'mastodon': 'https://',
}

[docs] def id_for(event): """Generates an id for a Nostr event. Args: event (dict): Nostr event Returns: str: 32-character hex-encoded sha256 hash of the event, serialized according to NIP-01 """ event.setdefault('tags', []) event.setdefault('created_at', int(util.now(tz=timezone.utc).timestamp())) missing = set(('content', 'created_at', 'kind', 'pubkey', 'tags')) - event.keys() assert not missing, f'missing {missing}' # don't escape Unicode chars! # https://github.com/nostr-protocol/nips/issues/354 return sha256(json_dumps([ 0, event['pubkey'], event['created_at'], event['kind'], event['tags'], event['content'], ], ensure_ascii=False).encode()).hexdigest()
[docs] def uri_for(event): """Generates a NIP-19 nostr: URI for a Nostr event. Args: event (dict): Nostr event Returns: str: NIP-19 nostr: URI, based on the event's id and kind """ id = event.get('id') kind = event.get('kind') assert id and kind is not None, event prefix = ('note' if kind == KIND_NOTE else 'nprofile' if kind == KIND_PROFILE else 'nevent') return id_to_uri(prefix, id)
def is_bech32(id): if not id: return False id = id.removeprefix('nostr:') for prefix in BECH32_PREFIXES: if id.startswith(prefix): return True
[docs] def bech32_prefix_for(event): """Returns the bech32 prefix for a given event, based on its kind. Defined by NIP-19: https://nips.nostr.com/19 Args: event (dict): Nostr event Returns: str: bech32 prefix """ return { KIND_NOTE: 'note', # NIP-10 KIND_PROFILE: 'nprofile', # NIP-01 }.get(event['kind'], 'nevent')
[docs] def uri_to_id(uri): """Converts a nostr: URI with bech32-encoded id to a hex sha256 hash id. Based on NIP-21. Args: uri (str) Returns: str: hex """ if not uri: return uri return bech32_decode(uri.removeprefix('nostr:'))
[docs] def id_to_uri(prefix, id): """Converts a hex sha256 hash id to a nostr: URI with bech32-encoded id. Based on NIP-21. Args: prefix (str) id (str): hex Returns: str: bech32-encoded """ return 'nostr:' + bech32_encode(prefix, id)
[docs] def bech32_decode(val): """Converts a bech32-encoded string to its corresponding hex string. Based on NIP-19. Args: val (str): bech32 Returns: str: hex """ if not val or not is_bech32(val): return val prefix, data = bech32.bech32_decode(val) return bytes(bech32.convertbits(data, 5, 8, pad=False)).hex()
[docs] def bech32_encode(prefix, hex): """Converts a hex string to a bech32-encoded string. Based on NIP-19. Args: prefix (str) hex (str) Returns: str: bech32 """ if not hex: return hex data = bech32.convertbits(bytes.fromhex(hex), 8, 5) return bech32.bech32_encode(prefix, data)
[docs] def nip05_to_npub(nip05): """Resolves a NIP-05 identifier or domain to a bech32-encoded npub public key. https://nips.nostr.com/5 Args: nip05 (str): NIP-05 identifier, e.g. "alice@example.com" or "_@example.com" Returns: str: bech32-encoded npub public key Raises: ValueError: if nip05 is invalid format or user not found requests.HTTPError: if HTTP request fails """ parts = nip05.split('@') if len(parts) == 1: domain = parts[0] user = '_' elif len(parts) == 2: user, domain = parts else: raise ValueError(f'Invalid NIP-05 identifier: {nip05}') if not user or not domain: raise ValueError(f'Invalid NIP-05 identifier: {nip05}') url = f'https://{domain}/.well-known/nostr.json?name={user}' resp = util.requests_get(url, timeout=HTTP_TIMEOUT) resp.raise_for_status() data = resp.json() if not (pubkey := data.get('names', {}).get(user)): raise ValueError(f'User {user} not found at {domain}') # convert hex pubkey to npub return id_to_uri('npub', pubkey).removeprefix('nostr:')
[docs] def id_and_sign(event, privkey): """Populates a Nostr event's id and signature, in place. Args: event (dict) privkey (str): bech32-encoded nsec private key Returns: dict: event, populated with ``id`` and ``sig`` fields """ assert privkey.startswith('nsec') or privkey.startswith('nostr:nsec'), privkey privkey = uri_to_id(privkey) assert len(privkey) == 64, privkey assert not event.get('id') and not event.get('sig'), event event['id'] = id_for(event) key = secp256k1.PrivateKey(privkey=privkey, raw=False) event['sig'] = key.schnorr_sign(bytes.fromhex(event['id']), None, raw=True).hex() return event
[docs] def verify(event): """Verifies a Nostr event's signature using the key in its ``pubkey`` field. Args: event (dict) Returns: bool: True if the signature is valid, False otherwise, eg if the signature is invalid, or if the ``id`` or ``sig`` or ``pubkey`` fields are missing, or if ``id`` is not the event's correct hash """ if (not (sig := event.get('sig')) or not (id := event.get('id')) or not (pubkey := event.get('pubkey'))): return False if id != id_for(event): return False # secp256k1-py generates and expects 33-byte public keys, not 32. the difference # seems to be a prefix byte that's always either 0x02 or 0x03. not sure why, but it # doesn't seem to matter, we can just arbitrarily tack 0x02 onto a 32-byte key and # it still generates and verifies signatures fine. # https://github.com/snarfed/bridgy-fed/issues/446#issuecomment-2925960330 if len(pubkey) != 64: return False try: key = secp256k1.PublicKey(bytes.fromhex('02' + pubkey), raw=True) return key.schnorr_verify(bytes.fromhex(id), bytes.fromhex(sig), None, raw=True) except (TypeError, ValueError): return False
[docs] def pubkey_from_privkey(privkey): """Returns the hex-encoded public key for a hex-encoded private key. Removes the leading 0x02 or 0x03 byte prefix that secp256k1-py includes. Background: https://github.com/snarfed/bridgy-fed/issues/446#issuecomment-2925960330 Note that :func:`verify` does the inverse; it adds a 0x02 prefix internally, which secp256k1-py needs to load the public key. Args: privkey (str): hex secp256k1 private key Returns: str: corresponding hex secp256k1 public key """ privkey = secp256k1.PrivateKey(bytes.fromhex(privkey), raw=True) pubkey = privkey.pubkey.serialize().hex()[2:] assert len(pubkey) == 64 return pubkey
[docs] def from_as1(obj, privkey=None, remote_relay='', from_protocol=None): """Converts an ActivityStreams 1 activity or object to a Nostr event. Args: obj (dict): AS1 activity or object privkey (str): optional bech32-encoded private key to sign the event with. Also used to set the output event's ``pubkey`` field if ``obj`` doesn't have an ``nsec`` id remote_relays (sequence of str): optional sequence of remote relays where the "target" of this object - followee, in-reply-to, repost-of, etc - can be fetched. from_protocol (str): optional source protocol for NIP-48 proxy tag. Supported values: 'activitypub', 'atproto', 'web' Returns: dict: Nostr event """ type = as1.object_type(obj) id = obj.get('id') inner_obj = as1.get_object(obj) inner_hex_id = uri_to_id(inner_obj.get('id')) pubkey = uri_to_id(as1.get_owner(obj)) if privkey: privkey = privkey.removeprefix('nostr:') if not pubkey: pubkey = pubkey_from_privkey(uri_to_id(privkey)) content = (html_to_text(obj.get('content') or obj.get('summary')) or obj.get('displayName') or '') event = { 'pubkey': pubkey, 'content': content, 'tags': [], } published = obj.get('published') if published: event['created_at'] = int(util.parse_iso8601(published).timestamp()) # NIP-48 proxy tag if from_protocol and id and not id.startswith('nostr:'): event['tags'].append(['proxy', id, from_protocol]) # types if type in as1.ACTOR_TYPES: content = { 'name': obj.get('displayName'), 'about': obj.get('summary'), 'website': obj.get('url') or util.get_first(obj, 'urls'), } if username := obj.get('username'): if '@' in username: content['nip05'] = username elif re.fullmatch(util.DOMAIN_RE, username): content['nip05'] = f'_@{username}' for img in as1.get_objects(obj, 'image'): if url := img.get('url') or img.get('id'): field = 'banner' if img.get('objectType') == 'featured' else 'picture' content.setdefault(field, url) event.update({ 'kind': KIND_PROFILE, # don't escape Unicode chars! # https://github.com/nostr-protocol/nips/issues/354 'content': json_dumps(util.trim_nulls(content), sort_keys=True, ensure_ascii=False), }) if id: event['pubkey'] = uri_to_id(id) for url in as1.object_urls(obj): for platform, base_url in PLATFORMS.items(): # we don't known which URLs might be Mastodon, so don't try to guess if platform != 'mastodon' and url.startswith(base_url): event['tags'].append( ['i', f'{platform}:{url.removeprefix(base_url)}', '-']) elif type in ('post', 'update'): return from_as1(inner_obj, privkey=privkey, remote_relay=remote_relay, from_protocol=from_protocol) elif type in ('article', 'note'): event.update({ 'kind': KIND_NOTE if type == 'note' else KIND_ARTICLE, }) in_reply_to = as1.get_object(obj, 'inReplyTo') if in_reply_to: id = uri_to_id(in_reply_to.get('id')) # https://nips.nostr.com/10 # Kind 1 events with e tags are replies to other kind 1 events. Kind 1 replies MUST NOT be used to reply to other kinds, use NIP-22 instead. # ["e", <event-id>, <relay-url>, <marker>, <pubkey>] # Where: # <event-id> is the id of the event being referenced. # <relay-url> is the URL of a recommended relay associated with the reference. Clients SHOULD add a valid <relay-url> field, but may instead leave it as "". # <marker> is optional and if present is one of "reply", "root". # <pubkey> is optional, SHOULD be the pubkey of the author of the referenced event event['tags'].append(['e', id, remote_relay, 'reply']) author = as1.get_object(in_reply_to, 'author').get('id') if author: event['tags'].append(['p', uri_to_id(orig_event.get('pubkey'))]) if type == 'article' and published: event['tags'].append(['published_at', str(event['created_at'])]) if title := obj.get('title'): event['tags'].extend([ ['title', title], ['subject', title], # NIP-14 subject tag ]) if summary := obj.get('summary'): event['tags'].append(['summary', summary]) for tag in util.get_list(obj, 'tags'): name = tag.get('displayName') if name and tag.get('objectType') == 'hashtag': event['tags'].append(['t', name]) if location := as1.get_object(obj, 'location').get('displayName'): event['tags'].append(['location', location]) # imeta tags for images, video, audio video_audio = [as1.get_object(att, 'stream') for att in as1.get_objects(obj, 'attachments')] for img in as1.get_objects(obj, 'image') + video_audio: if url := img.get('url') or img.get('id'): tag = ['imeta', f'url {url}'] if alt := img.get('displayName'): tag.append(f'alt {alt}') if mime := img.get('mimeType') or mimetypes.guess_type(url, strict=False)[0]: tag.append(f'm {mime}') event['tags'].append(tag) # add to text content if necessary if url not in event['content']: event['content'] += ' ' + url elif type == 'share': event.update({ 'kind': KIND_REPOST, }) if inner_obj: orig_event = from_as1(inner_obj, from_protocol=from_protocol) event['content'] = json_dumps(orig_event, sort_keys=True, ensure_ascii=False) event['tags'] = [ # https://nips.nostr.com/18 # "The repost event MUST include an e tag with the id of the note that is being reposted. That tag MUST include a relay URL as its third entry to indicate where it can be fetched." ['e', orig_event.get('id'), remote_relay, 'mention'], ['p', orig_event.get('pubkey')], ] elif type in ('like', 'dislike', 'react'): event.update({ 'kind': KIND_REACTION, 'content': '+' if type == 'like' else '-' if type == 'dislike' else obj.get('content'), 'tags': [['e', inner_hex_id]], }) elif type == 'delete': event.update({ 'kind': KIND_DELETE, # TODO: include kind of the object being deleted, in a `k` tag. we'd have # to fetch it first. :/ 'tags': [['e', inner_hex_id]], }) elif type == 'follow': event.update({ 'kind': KIND_CONTACTS, # https://nips.nostr.com/2 # Each tag entry should contain the key for the profile, a relay URL where events from that key can be found (can be set to an empty string if not needed), and a local name (or "petname") for that profile (can also be set to an empty string or not provided), i.e., ["p", <32-bytes hex key>, <main relay URL>, <petname>]. 'tags': [['p', uri_to_id(o['id']), remote_relay, o.get('displayName') or ''] for o in as1.get_objects(obj) if o.get('id')] }) else: raise NotImplementedError(f'Unsupported activity/object type: {type}') event = util.trim_nulls(event, ignore=['tags', 'content']) if privkey: id_and_sign(event, privkey) elif pubkey: event['id'] = id_for(event) return event
[docs] def to_as1(event): """Converts a Nostr event to an ActivityStreams 2 activity or object. Args: event (dict): Nostr event Returns: dict: AS1 activity or object """ if not event: return {} obj = {} if id := event.get('id'): obj['id'] = id_to_uri('nevent', id) kind = event['kind'] tags = event.get('tags', []) content = event.get('content') if kind == KIND_PROFILE: # profile content = json_loads(content) if content else {} nip05_domain = (content['nip05'].removeprefix('_@') if isinstance(content.get('nip05'), str) else '') obj.update({ 'objectType': 'person', 'id': id_to_uri('npub', event['pubkey']), 'displayName': content.get('display_name') or content.get('name'), 'summary': content.get('about'), 'username': nip05_domain, 'urls': [], }) obj['image'] = [] if picture := content.get('picture'): obj['image'].append(picture) if banner := content.get('banner'): obj['image'].append({'url': banner, 'objectType': 'featured'}) if website := content.get('website'): obj['url'] = website obj['urls'].append(website) for tag in tags: if tag[0] == 'i': platform, identity = tag[1].split(':') base_url = PLATFORMS.get(platform) if base_url: obj['urls'].append(base_url + identity) elif kind in (KIND_NOTE, KIND_ARTICLE): # note, article obj.update({ 'objectType': 'note' if kind == KIND_NOTE else 'article', # TODO: render Markdown to HTML? 'content': event.get('content'), 'image': [], 'attachments': [], 'tags': [], }) if id: obj['id'] = id_to_uri('note', id) pubkey = event.get('pubkey') if pubkey: obj['author'] = id_to_uri('npub', pubkey) for tag in tags: type = tag[0] if type == 'e' and tag[-1] == 'reply': obj['inReplyTo'] = id_to_uri('nevent', tag[1]) elif type == 't' and len(tag) >= 2: obj['tags'].extend({'objectType': 'hashtag', 'displayName': t} for t in tag[1:]) elif type in ('title', 'summary'): obj[type] = tag[1] elif type == 'subject': # NIP-14 subject tag obj.setdefault('title', tag[1]) elif type == 'location': obj['location'] = {'displayName': tag[1]} elif type == 'imeta': metas = dict(val.split(maxsplit=1) for val in tag[1:]) if url := metas.get('url'): mime = metas.get('m') or mimetypes.guess_type(url, strict=False)[0] type = mime.split('/')[0] if type == 'image': obj['image'].append({ 'objectType': 'image', 'url': url, 'mimeType': mime, 'displayName': metas.get('alt'), }) elif type in ('video', 'audio'): obj['attachments'].append({ 'objectType': type, 'displayName': metas.get('alt'), 'stream': { 'url': url, 'mimeType': mime, }, }) else: continue # remove from text content obj['content'] = obj['content'].replace(url, '').rstrip() elif kind in (KIND_REPOST, KIND_GENERIC_REPOST): # repost obj.update({ 'objectType': 'activity', 'verb': 'share', }) for tag in tags: if tag[0] == 'e' and tag[-1] == 'mention': obj['object'] = id_to_uri('note', tag[1]) if content and content.startswith('{'): obj['object'] = to_as1(json_loads(content)) elif kind == KIND_REACTION: # like/reaction obj.update({ 'objectType': 'activity', }) if content == '+': obj['verb'] = 'like' elif content == '-': obj['verb'] = 'dislike' else: obj['verb'] = 'react' obj['content'] = content for tag in tags: if tag[0] == 'e': obj['object'] = id_to_uri('nevent', tag[1]) elif kind == KIND_DELETE: # delete obj.update({ 'objectType': 'activity', 'verb': 'delete', 'object': [], 'content': content, }) for tag in tags: # TODO: support NIP-33 'a' tags if tag[0] == 'e': obj['object'].append(id_to_uri('nevent', tag[1])) elif kind == KIND_CONTACTS: # follow obj.update({ 'objectType': 'activity', 'verb': 'follow', 'object': [], 'content': content, }) for tag in tags: if tag[0] == 'p': name = tag[3] if len(tag) >= 4 else None id = id_to_uri('npub', tag[1]) obj['object'].append({'id': id, 'displayName': name} if name else id) # common fields created_at = event.get('created_at') if created_at: obj['published'] = datetime.fromtimestamp(created_at, tz=timezone.utc).isoformat() if isinstance(obj.get('object'), list) and len(obj['object']) == 1: obj['object'] = obj['object'][0] if obj.get('objectType') == 'activity' and (pubkey := event.get('pubkey')): obj['actor'] = id_to_uri('npub', pubkey) return util.trim_nulls(Source.postprocess_object(obj))
[docs] class Nostr(Source): """Nostr source class. See file docstring and :class:`Source` for details. Attributes: relays (sequence of str): relay hostnames """ DOMAIN = None BASE_URL = None NAME = 'Nostr' def __init__(self, relays=(), privkey=None): """Constructor. Args: relays (sequence of str) privkey (str): optional bech32-encoded private key of the current user. Required by :meth:`create` in order to sign events. """ self.relays = relays if privkey: assert is_bech32(privkey), privkey self.privkey = privkey self.hex_pubkey = pubkey_from_privkey(uri_to_id(privkey)) if privkey else None @classmethod def user_url(cls, nprofile): nprofile = nprofile.removeprefix('nostr:') assert nprofile.startswith('nprofile') return f'https://coracle.social/people/{nprofile}'
[docs] def get_actor(self, user_id=None): """Fetches and returns a Nostr user profile. Args: user_id (str): NIP-21 ``nostr:npub...`` Returns: dict: AS1 actor object """ if not user_id or not user_id.removeprefix('nostr:').startswith('npub'): raise ValueError(f'Expected nostr:npub..., got {user_id}') id = uri_to_id(user_id) # query for activities logger.debug(f'connecting to {self.relays[0]}') with connect(self.relays[0], open_timeout=HTTP_TIMEOUT, close_timeout=HTTP_TIMEOUT, ) as websocket: events = self.query(websocket, { 'authors': [id], 'kinds': [KIND_PROFILE], }) if events: # will we ever get multiple here? if so, assume the last is the most recent? return to_as1(events[-1])
[docs] def get_activities_response(self, user_id=None, group_id=None, app_id=None, activity_id=None, fetch_replies=False, fetch_likes=False, fetch_shares=False, include_shares=True, fetch_events=False, fetch_mentions=False, search_query=None, start_index=None, count=None, cache=None, **kwargs): """Fetches events and converts them to AS1 activities. See :meth:`Source.get_activities_response` for more information. """ assert not start_index assert not cache assert not fetch_mentions assert not fetch_events # build query filter filter = { 'limit': count or 20, } if activity_id: if is_bech32(activity_id): activity_id = uri_to_id(activity_id) filter['ids'] = [activity_id] if user_id: if is_bech32(user_id): user_id = uri_to_id(user_id) filter['authors'] = [user_id] if search_query: filter['search'] = search_query events = [] # query for activities logger.debug(f'connecting to {self.relays[0]}') with connect(self.relays[0], open_timeout=HTTP_TIMEOUT, close_timeout=HTTP_TIMEOUT, ) as websocket: events = self.query(websocket, filter) event_ids = [e['id'] for e in events] # maps raw Nostr id to activity activities = {uri_to_id(a['id']): a for a in [to_as1(e) for e in events]} assert len(activities) == len(events) # query for replies/shares if event_ids and (fetch_replies or fetch_shares): for event in self.query(websocket, {'#e': event_ids}): obj = to_as1(event) if in_reply_to := obj.get('inReplyTo'): activity = activities.get(uri_to_id(in_reply_to)) if activity: replies = activity.setdefault('replies', { 'items': [], 'totalItems': 0, }) replies['items'].append(obj) replies['totalItems'] += 1 elif obj.get('verb') == 'share': activity = activities.get(uri_to_id(as1.get_object(obj).get('id'))) if activity: activity.setdefault('tags', []).append(obj) return self.make_activities_base_response(util.trim_nulls(activities.values()))
[docs] def query(self, websocket, filter): """Runs a Nostr ``REQ`` query on an open websocket. Sends the query, collects the responses, and closes the ``REQ`` subscription. If ``limit`` is not set on the filter, it defaults to 20. Args: websocket (websockets.sync.client.ClientConnection) filter (dict): NIP-01 ``REQ`` filter limit (int) Returns: list of dict: Nostr events """ limit = filter.setdefault('limit', 20) subscription = secrets.token_urlsafe(16) req = ['REQ', subscription, filter] try: logger.debug(f'{websocket.remote_address} <= {req}') websocket.send(json_dumps(req)) except ConnectionClosedOK as err: logger.warning(err) return [] events = [] try: while True: msg = websocket.recv(timeout=HTTP_TIMEOUT) logger.debug(f'{websocket.remote_address} => {msg}') resp = json_loads(msg) if resp[:3] == ['OK', subscription, False]: return events elif resp[:2] == ['EVENT', subscription]: event = resp[2] if verify(event): events.append(event) else: logger.warning(f'Invalid signature for event {event.get("id")}') elif resp[0] == 'AUTH' and len(resp) >= 2: auth = ['AUTH', id_and_sign({ 'kind': KIND_AUTH, 'pubkey': self.hex_pubkey, 'content': '', 'tags': [ ['relay', f'wss://{websocket.remote_address[0]}/'], ['challenge', resp[1]], ], }, self.privkey)] logger.debug(f'{websocket.remote_address} <= {auth}') websocket.send(json_dumps(auth)) elif len(events) >= limit or resp[:2] == ['EOSE', subscription]: break close = ['CLOSE', subscription] logger.debug(f'{websocket.remote_address} <= {close}') websocket.send(json_dumps(close)) except ConnectionClosedOK as err: logger.warning(err) return events
[docs] def create(self, obj, include_link=OMIT_LINK, ignore_formatting=False): """Creates a new object: a post, comment, like, repost, etc. See :meth:`Source.create` docstring for details. """ assert self.privkey type = as1.object_type(obj) url = obj.get('url') is_reply = type == 'comment' or obj.get('inReplyTo') base_obj = self.base_object(obj) base_url = base_obj.get('url') prefer_content = type == 'note' or (base_url and is_reply) event = from_as1(obj, privkey=self.privkey) content = self._content_for_create( obj, ignore_formatting=ignore_formatting, prefer_name=not prefer_content) or '' if include_link == INCLUDE_LINK and url: content += '\n' + url event['content'] = content event.setdefault('pubkey', self.hex_pubkey) event['id'] = id_for(event) missing = (set(('content', 'created_at', 'kind', 'id', 'pubkey', 'tags')) - event.keys()) assert not missing, f'missing {missing}' logger.debug(f'connecting to {self.relays[0]}') with connect(self.relays[0], open_timeout=HTTP_TIMEOUT, close_timeout=HTTP_TIMEOUT, ) as websocket: create = ['EVENT', event] logger.debug(f'{websocket.remote_address} <= {create}') try: websocket.send(json_dumps(create)) msg = websocket.recv(timeout=HTTP_TIMEOUT) except ConnectionClosedOK as cc: logger.warning(cc) return resp = json_loads(msg) if resp[:3] == ['OK', event['id'], True]: return creation_result(event) return creation_result(error_plain=resp[-1], abort=True)
[docs] def delete(self, id): """Deletes a post. Args: id (str): bech32-encoded id of the event to delete """ return self.create({ 'objectType': 'activity', 'verb': 'delete', 'object': id, 'published': util.now(tz=timezone.utc).isoformat(), })
@classmethod def _postprocess_base_object(cls, obj): # don't mess with ids return obj