Source code for granary.nostr

"""Nostr.

* https://nostr.com/
* https://github.com/nostr-protocol/nostr
* https://github.com/nostr-protocol/nips

NIPS implemented:

* 01: base protocol, events, profile metadata
* 02: contacts/followings
* 05: domain identifiers
* 09: deletes
* 10: text notes, replies, mentions
* 12: hashtags, locations
* 14: subject tag in notes
* 18: reposts, including 10 for e/p tags
* 19: bech32-encoded ids
* 21: nostr: URI scheme
* 23: articles
* 24: extra fields
* 25: likes, emoji reactions
* 27: user mentions
* 39: external identities
* 48: proxy tags
* 50: search
* 92/94: image, video, audio attachments

TODO:

* 12: tag queries
* 16, 33: ephemeral/replaceable events
* 17: DMs
* 27: user mentions, note/event mentions (to_as1 direction)
*     the difficulty is that the Nostr tags don't include human-readable
*     text. clients are supposed to get that from their local database.
* 32: tag activities
* 46: "Nostr Connect," signing proxy that holds user's keys
* 73: external content ids
"""
import copy
from datetime import datetime, timezone
from hashlib import sha256
import itertools
import logging
import mimetypes
import re
import secrets

import bech32
from bs4 import BeautifulSoup
from oauth_dropins.webutil import util
from oauth_dropins.webutil.util import HTTP_TIMEOUT, json_dumps, json_loads
import secp256k1
from websockets.exceptions import ConnectionClosedOK
from websockets.sync.client import connect

from . import as1
from .source import (
  creation_result,
  FRIENDS,
  HTML_ENTITY_RE,
  html_to_text,
  INCLUDE_LINK,
  OMIT_LINK,
  Source,
)

logger = logging.getLogger(__name__)

# NIP-19
BECH32_PREFIXES = (
  'naddr',
  'nevent',
  'note',
  'nprofile',
  'npub',
  'nrelay',
  'nsec',
)
# bech32-encoded ids with these prefix are always TLV
BECH32_TLV_PREFIXES = (
  'naddr',
  'nevent',
  'nprofile',
  'nrelay',
)
BECH32_RE = re.compile(f'(?P<prefix>{"|".join(BECH32_PREFIXES)})[a-z0-9]{{50,}}')
URI_RE = re.compile(r'\bnostr:' + BECH32_RE.pattern + r'\b')
ID_RE = re.compile(r'[0-9a-f]{64}')

# Event kinds
# https://github.com/nostr-protocol/nips#event-kinds
KIND_PROFILE = 0          # NIP-01: user profile metadata
KIND_NOTE = 1             # NIP-01: text note
KIND_CONTACTS = 3         # NIP-02: contact list / followings
KIND_DELETE = 5           # NIP-09: event deletion
KIND_REPOST = 6           # NIP-18: repost
KIND_REACTION = 7         # NIP-25: reactions (likes, dislikes, emojis)
KIND_GENERIC_REPOST = 16  # NIP-18: generic repost
KIND_AUTH = 22242         # NIP-42: client authentication
KIND_ARTICLE = 30023      # NIP-23: long-form content
KIND_RELAYS = 10002       # NIP-65: user relays

# NIP-39
# https://github.com/nostr-protocol/nips/blob/master/39.md#claim-types
# maps NIP-39 platform to base URL
PLATFORMS = {
  'github': 'https://github.com/',
  'telegram': 'https://t.me/',
  'twitter': 'https://twitter.com/',
  'mastodon': 'https://',
}

[docs] def id_for(event): """Generates an id for a Nostr event. Args: event (dict): Nostr event Returns: str: 32-character hex-encoded sha256 hash of the event, serialized according to NIP-01 """ event.setdefault('tags', []) event.setdefault('created_at', int(util.now(tz=timezone.utc).timestamp())) missing = set(('content', 'created_at', 'kind', 'pubkey', 'tags')) - event.keys() assert not missing, f'missing {missing}' # don't escape Unicode chars! # https://github.com/nostr-protocol/nips/issues/354 return sha256(json_dumps([ 0, event['pubkey'], event['created_at'], event['kind'], event['tags'], event['content'], ], ensure_ascii=False).encode()).hexdigest()
[docs] def uri_for(event): """Generates a NIP-19 nostr: URI for a Nostr event. Args: event (dict): Nostr event Returns: str: NIP-19 nostr: URI, based on the event's id and kind """ id = event.get('id') kind = event.get('kind') assert id and kind is not None, event prefix = ('note' if kind == KIND_NOTE else 'nprofile' if kind == KIND_PROFILE else 'nevent') return id_to_uri(prefix, id)
def is_bech32(id): if not id: return False id = id.removeprefix('nostr:') for prefix in BECH32_PREFIXES: if id.startswith(prefix): return True
[docs] def bech32_prefix_for(event): """Returns the bech32 prefix for a given event, based on its kind. Defined by NIP-19: https://nips.nostr.com/19 Args: event (dict): Nostr event Returns: str: bech32 prefix """ return { KIND_NOTE: 'note', # NIP-10 KIND_PROFILE: 'nprofile', # NIP-01 }.get(event['kind'], 'nevent')
[docs] def uri_to_id(uri): """Converts a nostr: URI with bech32-encoded id to a hex sha256 hash id. Based on NIP-21. Args: uri (str) Returns: str: hex """ if not uri: return uri return bech32_decode(uri.removeprefix('nostr:'))
[docs] def id_to_uri(prefix, id): """Converts a hex sha256 hash id to a nostr: URI with bech32-encoded id. Based on NIP-21. Args: prefix (str) id (str): hex Returns: str: bech32-encoded """ return 'nostr:' + bech32_encode(prefix, id.removeprefix('nostr:'))
[docs] def bech32_decode(val): """Converts a bech32-encoded string to its corresponding hex string. Based on NIP-19. Args: val (str): bech32 Returns: str: hex, or val unchanged if it can't be decoded """ if not val or not is_bech32(val): return val prefix, bits = bech32.bech32_decode(val) if not bits: return val converted = bech32.convertbits(bits, 5, 8, pad=False) if not converted: return val data = bytes(converted) if prefix in BECH32_TLV_PREFIXES: # TLV! find the type 0 value, it's (usually) the id while data: type, length = data[:2] assert type in (0, 1, 2, 3), type if type == 0: assert length == 32, length data = data[2:34] break data = data[length + 2:] if not data: return None return data.hex()
[docs] def bech32_encode(prefix, hex): """Converts a hex id to a bech32-encoded string. Based on NIP-19. Args: prefix (str) hex (str) Returns: str: bech32, or hex unchanged if it can't be encoded """ if not hex or len(hex) != 64: return hex if prefix in BECH32_TLV_PREFIXES: assert prefix in ('nprofile', 'nevent') # first byte 0 for id/pubkey, second byte 32 for length hex = '0020' + hex try: bytes_data = bytes.fromhex(hex) except ValueError: return hex data = bech32.convertbits(bytes_data, 8, 5) if not data: return hex return bech32.bech32_encode(prefix, data)
[docs] def nip05_to_npub(nip05): """Resolves a NIP-05 identifier or domain to a bech32-encoded npub public key. https://nips.nostr.com/5 Args: nip05 (str): NIP-05 identifier, e.g. "alice@example.com" or "_@example.com" Returns: str: bech32-encoded npub public key Raises: ValueError: if nip05 is invalid format or user not found requests.HTTPError: if HTTP request fails """ parts = nip05.split('@') if len(parts) == 1: domain = parts[0] user = '_' elif len(parts) == 2: user, domain = parts else: raise ValueError(f'Invalid NIP-05 identifier: {nip05}') if not user or not domain: raise ValueError(f'Invalid NIP-05 identifier: {nip05}') url = f'https://{domain}/.well-known/nostr.json?name={user}' resp = util.requests_get(url, timeout=HTTP_TIMEOUT) resp.raise_for_status() data = resp.json() if not (pubkey := data.get('names', {}).get(user)): raise ValueError(f'User {user} not found at {domain}') # convert hex pubkey to npub return id_to_uri('npub', pubkey).removeprefix('nostr:')
[docs] def id_and_sign(event, privkey): """Populates a Nostr event's id and signature, in place. Args: event (dict) privkey (str): bech32-encoded nsec private key Returns: dict: event, populated with ``id`` and ``sig`` fields """ assert privkey.startswith('nsec') or privkey.startswith('nostr:nsec'), privkey privkey = uri_to_id(privkey) assert len(privkey) == 64, privkey assert not event.get('id') and not event.get('sig'), event event['id'] = id_for(event) key = secp256k1.PrivateKey(privkey=privkey, raw=False) event['sig'] = key.schnorr_sign(bytes.fromhex(event['id']), None, raw=True).hex() return event
[docs] def verify(event): """Verifies a Nostr event's signature using the key in its ``pubkey`` field. Args: event (dict) Returns: bool: True if the signature is valid, False otherwise, eg if the signature is invalid, or if the ``id`` or ``sig`` or ``pubkey`` fields are missing, or if ``id`` is not the event's correct hash """ if (not (sig := event.get('sig')) or not (id := event.get('id')) or not (pubkey := event.get('pubkey'))): return False if id != id_for(event): return False # secp256k1-py generates and expects 33-byte public keys, not 32. the difference # seems to be a prefix byte that's always either 0x02 or 0x03. not sure why, but it # doesn't seem to matter, we can just arbitrarily tack 0x02 onto a 32-byte key and # it still generates and verifies signatures fine. # https://github.com/snarfed/bridgy-fed/issues/446#issuecomment-2925960330 if len(pubkey) != 64: return False try: key = secp256k1.PublicKey(bytes.fromhex('02' + pubkey), raw=True) return key.schnorr_verify(bytes.fromhex(id), bytes.fromhex(sig), None, raw=True) except (TypeError, ValueError): return False
[docs] def pubkey_from_privkey(privkey): """Returns the hex-encoded public key for a hex-encoded private key. Removes the leading 0x02 or 0x03 byte prefix that secp256k1-py includes. Background: https://github.com/snarfed/bridgy-fed/issues/446#issuecomment-2925960330 Note that :func:`verify` does the inverse; it adds a 0x02 prefix internally, which secp256k1-py needs to load the public key. Args: privkey (str): hex secp256k1 private key Returns: str: corresponding hex secp256k1 public key """ privkey = secp256k1.PrivateKey(bytes.fromhex(privkey), raw=True) pubkey = privkey.pubkey.serialize().hex()[2:] assert len(pubkey) == 64 return pubkey
[docs] def from_as1(obj, privkey=None, remote_relay='', proxy_tag=None, multiple=False): """Converts an ActivityStreams 1 activity or object to a Nostr event. Args: obj (dict): AS1 activity or object privkey (str): optional bech32-encoded private key to sign the event with. Also used to set the output event's ``pubkey`` field if ``obj`` doesn't have an ``nsec`` id remote_relays (sequence of str): optional sequence of remote relays where the "target" of this object - followee, in-reply-to, repost-of, etc - can be fetched. proxy_tag (sequence of [str ID URL, str protocol]): optional NIP-48 proxy tag to add to the output event, without the initial ``proxy`` element. multiple (bool): unused, for compatibility with other from_as1 functions Returns: dict: Nostr event """ obj = copy.deepcopy(obj) type = as1.object_type(obj) id = obj.get('id') inner_obj = as1.get_object(obj) inner_hex_id = uri_to_id(inner_obj.get('id')) pubkey = uri_to_id(as1.get_owner(obj)) if privkey: privkey = privkey.removeprefix('nostr:') pubkey = pubkey_from_privkey(uri_to_id(privkey)) # generate and expand tags from content as1.convert_html_content_to_text(obj) as1.expand_tags(obj) # base event content = obj.get('content') or '' event = { 'pubkey': pubkey, 'content': content, 'tags': [], # ideally we'd use obj['published'], but some relays check created_at and require # it to be now, not backdated 'created_at': int(util.now(tz=timezone.utc).timestamp()), } # NIP-48 proxy tag if proxy_tag: assert len(proxy_tag) == 2, proxy_tag event['tags'].append(['proxy'] + list(proxy_tag)) # types if type in as1.ACTOR_TYPES: content = { 'name': obj.get('displayName'), 'about': obj.get('summary'), 'website': obj.get('url') or util.get_first(obj, 'urls'), } if username := obj.get('username'): if '@' in username: content['nip05'] = username elif re.fullmatch(util.DOMAIN_RE, username): content['nip05'] = f'_@{username}' for img in as1.get_objects(obj, 'image'): if url := img.get('url') or img.get('id'): field = 'banner' if img.get('objectType') == 'featured' else 'picture' content.setdefault(field, url) event.update({ 'kind': KIND_PROFILE, # don't escape Unicode chars! # https://github.com/nostr-protocol/nips/issues/354 'content': json_dumps(util.trim_nulls(content), sort_keys=True, ensure_ascii=False), }) event.setdefault('pubkey', uri_to_id(id)) for url in as1.object_urls(obj): for platform, base_url in PLATFORMS.items(): # we don't known which URLs might be Mastodon, so don't try to guess if platform != 'mastodon' and url.startswith(base_url): event['tags'].append( ['i', f'{platform}:{url.removeprefix(base_url)}', '-']) elif type in ('post', 'update'): return from_as1(inner_obj, privkey=privkey, remote_relay=remote_relay, proxy_tag=proxy_tag) elif type in ('article', 'comment', 'note'): if type == 'article': event['kind'] = KIND_ARTICLE event['tags'].append(['d', id]) else: event['kind'] = KIND_NOTE in_reply_to = as1.get_object(obj, 'inReplyTo') if in_reply_to: id = uri_to_id(in_reply_to.get('id')) # https://nips.nostr.com/10 # Kind 1 events with e tags are replies to other kind 1 events. Kind 1 replies MUST NOT be used to reply to other kinds, use NIP-22 instead. # ["e", <event-id>, <relay-url>, <marker>, <pubkey>] # Where: # <event-id> is the id of the event being referenced. # <relay-url> is the URL of a recommended relay associated with the reference. Clients SHOULD add a valid <relay-url> field, but may instead leave it as "". # <marker> is optional and if present is one of "reply", "root". # <pubkey> is optional, SHOULD be the pubkey of the author of the referenced event e = ['e', id, remote_relay] event['tags'].append(e) author = as1.get_object(in_reply_to, 'author').get('id') if author: if author_key := uri_to_id(author): e.extend(['', author_key]) event['tags'].append(['p', author_key]) if type == 'article': if published := obj.get('published'): published_at = str(int(util.parse_iso8601(published).timestamp())) event['tags'].append(['published_at', published_at]) if title := obj.get('title'): event['tags'].extend([ ['title', title], ['subject', title], # NIP-14 subject tag ]) if summary := obj.get('summary'): event['tags'].append(['summary', summary]) # tags: hashtags are NIP-12 't' tags, mentions are NIP-27 URIs in content mentions = [] for tag in util.get_list(obj, 'tags'): tag_type = tag.get('objectType') if tag_type == 'hashtag' and (name := tag.get('displayName')): event['tags'].append(['t', name]) elif tag_type == 'mention' and (url := tag.get('url')): if url.startswith('nostr:') or is_bech32(url) or ID_RE.fullmatch(url): util.add(event['tags'], ['p', uri_to_id(url)]) if 'startIndex' in tag and 'length' in tag: mentions.append(tag) # replace mentions in content in reverse order to preserve indices for tag in sorted(mentions, key=lambda m: m['startIndex'], reverse=True): start = tag['startIndex'] end = start + tag['length'] content = event['content'] npub_uri = id_to_uri('npub', uri_to_id(tag['url'])) event['content'] = content[:start] + npub_uri + content[end:] if location := as1.get_object(obj, 'location').get('displayName'): event['tags'].append(['location', location]) # imeta tags for images, video, audio # https://nips.nostr.com/92#media-attachments video_audio = [as1.get_object(att, 'stream') for att in as1.get_objects(obj, 'attachments')] for img in as1.get_objects(obj, 'image') + video_audio: if url := img.get('url') or img.get('id'): # requires at least one element besides url # if we ever start fetching the URL, we could include dim # other possibilities: https://nips.nostr.com/94#event-format tag = ['imeta', f'url {url}', f'alt {img.get("displayName") or ""}'] if mime := img.get('mimeType') or mimetypes.guess_type(url, strict=False)[0]: tag.append(f'm {mime}') event['tags'].append(tag) # add to text content if necessary if url not in event['content']: event['content'] += ' ' + url elif type == 'share': event.update({ 'kind': KIND_REPOST, 'content': '', }) if inner_obj: # https://nips.nostr.com/18 # "The repost event MUST include an e tag with the id of the note that is being reposted. That tag MUST include a relay URL as its third entry to indicate where it can be fetched." e_tag = ['e', inner_hex_id, remote_relay, ''] event['tags'].append(e_tag) if set(inner_obj.keys()) > {'id'}: orig_event = from_as1(inner_obj) event['content'] = json_dumps(orig_event, sort_keys=True, ensure_ascii=False) orig_author_pubkey = orig_event.get('pubkey') event['tags'].append(['p', orig_author_pubkey]) e_tag.append(orig_author_pubkey) elif type in ('like', 'dislike', 'react'): event.update({ 'kind': KIND_REACTION, 'content': '+' if type == 'like' else '-' if type == 'dislike' else obj.get('content'), }) event['tags'].append(['e', inner_hex_id]) elif type in ('delete', 'undo'): event.update({ 'kind': KIND_DELETE, # TODO: include kind of the object being deleted, in a `k` tag. we'd have # to fetch it first. :/ }) event['tags'].append(['e', inner_hex_id]) elif type == 'follow': event.update({ 'kind': KIND_CONTACTS, # https://nips.nostr.com/2 # # Each tag entry should contain the key for the profile, a relay URL where # events from that key can be found (can be set to an empty string if not # needed), and a local name (or "petname") for that profile (can also be set to # an empty string or not provided), eg: # ["p", <32-bytes hex key>, <main relay URL>, <petname>] }) event['tags'].extend( [['p', uri_to_id(o['id']), remote_relay, o.get('displayName') or ''] for o in as1.get_objects(obj) if o.get('id')]) else: raise NotImplementedError(f'Unsupported activity/object type: {type} {id}') # tags should all be strings, no nulls # https://nips.nostr.com/1#events-and-signatures # https://github.com/nostr-protocol/nips/issues/354#issuecomment-1465169789 for tag in event.get('tags', []): assert None not in tag, event event = util.trim_nulls(event, ignore=['tags', 'content']) if privkey: id_and_sign(event, privkey) elif pubkey: event['id'] = id_for(event) return event
[docs] def to_as1(event, id_format='hex', nostr_uri_ids=True): """Converts a Nostr event to an ActivityStreams 2 activity or object. Args: event (dict): Nostr event id_format (str, either 'hex' or 'bech32'): which format to use in id fields. Defaults to `hex`. nostr_uri_ids (bool): whether to prefix ids with `nostr:`. This is NIP-21 for `bech32` ids, non-standard for `hex` ids. Defaults to True. Returns: dict: AS1 activity or object """ assert id_format in ('hex', 'bech32') def make_id(id, prefix): """ Args: id (str): hex id prefix (str): bech32 prefix """ assert ID_RE.fullmatch(id) if id_format == 'bech32': id = bech32_encode(prefix, id) if nostr_uri_ids: id = 'nostr:' + id return id if not event: return {} obj = {} id_bech32 = None if id := event.get('id'): prefix = bech32_prefix_for(event) obj['id'] = make_id(id, prefix) id_bech32 = bech32_encode(prefix, id) kind = event['kind'] tags = event.get('tags', []) content = event.get('content') pubkey = event.get('pubkey') if kind == KIND_PROFILE: # profile content = json_loads(content) if content else {} nip05 = (content['nip05'].removeprefix('_@') if isinstance(content.get('nip05'), str) else '') obj.update({ 'objectType': 'person', 'id': make_id(pubkey, 'npub'), 'displayName': content.get('display_name') or content.get('name'), 'summary': content.get('about'), 'username': nip05, 'urls': [], }) obj['image'] = [] if picture := content.get('picture'): obj['image'].append(picture) if banner := content.get('banner'): obj['image'].append({'url': banner, 'objectType': 'featured'}) if website := content.get('website'): obj['url'] = website obj['urls'].append(website) for tag in tags: if tag[0] == 'i': platform, identity = tag[1].split(':') base_url = PLATFORMS.get(platform) if base_url: obj['urls'].append(base_url + identity) obj['urls'].append(Nostr.object_url(nip05 or id_bech32)) elif kind in (KIND_NOTE, KIND_ARTICLE): # note, article obj.update({ 'objectType': 'note' if kind == KIND_NOTE else 'article', 'author': make_id(pubkey, 'npub'), 'content': event.get('content'), 'content_is_html': False, 'image': [], 'attachments': [], 'tags': [], 'url': Nostr.object_url(id_bech32), }) if id: obj['id'] = make_id(id, 'note') for tag in tags: type = tag[0] if type == 'd' and len(tag) >= 2 and tag[1] and not is_bech32(tag[1]): obj['id'] = tag[1] if type == 'e' and len(tag) >= 2: obj['inReplyTo'] = make_id(tag[1], 'nevent') elif type == 't' and len(tag) >= 2: obj['tags'].extend({'objectType': 'hashtag', 'displayName': t} for t in tag[1:]) elif type in ('title', 'summary'): obj[type] = tag[1] elif type == 'subject': # NIP-14 subject tag obj.setdefault('title', tag[1]) elif type == 'location': obj['location'] = {'displayName': tag[1]} elif type == 'imeta': metas = {} for val in tag[1:]: parts = val.split(maxsplit=1) if len(parts) == 2: metas[parts[0]] = parts[1] if url := metas.get('url'): mime = metas.get('m') or mimetypes.guess_type(url, strict=False)[0] or '' type = mime.split('/')[0] if type == 'image': obj['image'].append({ 'objectType': 'image', 'url': url, 'mimeType': mime, 'displayName': metas.get('alt'), }) elif type in ('video', 'audio'): obj['attachments'].append({ 'objectType': type, 'displayName': metas.get('alt'), 'stream': { 'url': url, 'mimeType': mime, }, }) else: continue # remove from text content obj['content'] = obj['content'].replace(url, '').rstrip() # convert NIP-27 user mentions in content to AS1 tags, and replace # URIs in content with mentioned users' handles # # can't easily do this in granary.nostr.to_as1 because we need to fetch # the Nostr user for match in URI_RE.finditer(obj['content'] or ''): if match['prefix'] in ('npub', 'nprofile'): uri = match.group(0) obj['tags'].append({ 'objectType': 'mention', 'url': make_id(uri_to_id(uri), 'npub'), 'displayName': uri, 'startIndex': match.start(), 'length': match.end() - match.start(), }) elif kind in (KIND_REPOST, KIND_GENERIC_REPOST): # repost obj.update({ 'objectType': 'activity', 'verb': 'share', 'url': Nostr.object_url(id_bech32), }) for tag in tags: if tag[0] == 'e': orig_post_id = make_id(tag[1], 'note') if len(tag) >= 5 and tag[4]: obj['object'] = { 'id': orig_post_id, 'author': make_id(tag[4], 'npub') } else: obj['object'] = orig_post_id if content and content.startswith('{'): obj['object'] = to_as1(json_loads(content)) elif kind == KIND_REACTION: # like/reaction obj.update({ 'objectType': 'activity', }) if content == '+': obj['verb'] = 'like' elif content == '-': obj['verb'] = 'dislike' else: obj['verb'] = 'react' obj['content'] = content for tag in tags: if tag[0] == 'e': obj['object'] = make_id(tag[1], 'nevent') elif kind == KIND_DELETE: # delete obj.update({ 'objectType': 'activity', 'verb': 'delete', 'object': [], 'content': content, }) for tag in tags: # TODO: support NIP-33 'a' tags if tag[0] == 'e': obj['object'].append(make_id(tag[1], 'nevent')) elif kind == KIND_CONTACTS: # follow obj.update({ 'objectType': 'activity', 'verb': 'follow', 'object': [], 'content': content, }) for tag in tags: if tag[0] == 'p': name = tag[3] if len(tag) >= 4 else None id = make_id(tag[1], 'npub') obj['object'].append({'id': id, 'displayName': name} if name else id) elif kind == KIND_RELAYS: # not really converting this to anything meaningful, just including author # so we know who it's for obj['author'] = make_id(pubkey, 'npub') # common fields created_at = event.get('created_at') if created_at: obj['published'] = datetime.fromtimestamp(created_at, tz=timezone.utc).isoformat() if isinstance(obj.get('object'), list) and len(obj['object']) == 1: obj['object'] = obj['object'][0] if obj.get('objectType') == 'activity' and (pubkey := event.get('pubkey')): obj['actor'] = make_id(pubkey, 'npub') return util.trim_nulls(Source.postprocess_object(obj))
[docs] class Nostr(Source): """Nostr source class. See file docstring and :class:`Source` for details. Attributes: relays (sequence of str): relay hostnames """ DOMAIN = None BASE_URL = None NAME = 'Nostr' def __init__(self, relays=(), privkey=None): """Constructor. Args: relays (sequence of str) privkey (str): optional bech32-encoded private key of the current user. Required by :meth:`create` in order to sign events. """ self.relays = relays if privkey: assert is_bech32(privkey), privkey self.privkey = privkey self.hex_pubkey = pubkey_from_privkey(uri_to_id(privkey)) if privkey else None
[docs] @classmethod def object_url(cls, id_or_nip05): """Returns the njump.me URL for a given event id, npub, or NIP-05. Args: id_or_nip05 (str) Returns: str: njump.me URL """ if id_or_nip05: return f'https://njump.me/{id_or_nip05}'
user_url = post_url = object_url
[docs] def get_actor(self, user_id=None): """Fetches and returns a Nostr user profile. Args: user_id (str): NIP-21 ``nostr:npub...`` Returns: dict: AS1 actor object """ if not user_id or not user_id.removeprefix('nostr:').startswith('npub'): raise ValueError(f'Expected nostr:npub..., got {user_id}') id = uri_to_id(user_id) # query for activities logger.debug(f'connecting to {self.relays[0]}') with connect(self.relays[0], open_timeout=HTTP_TIMEOUT, close_timeout=HTTP_TIMEOUT, ) as websocket: events = self.query(websocket, { 'authors': [id], 'kinds': [KIND_PROFILE], }) if events: # will we ever get multiple here? if so, assume the last is the most recent? return to_as1(events[-1])
[docs] def get_activities_response(self, user_id=None, group_id=None, app_id=None, activity_id=None, fetch_replies=False, fetch_likes=False, fetch_shares=False, include_shares=True, fetch_events=False, fetch_mentions=False, search_query=None, start_index=None, count=None, cache=None, **kwargs): """Fetches events and converts them to AS1 activities. See :meth:`Source.get_activities_response` for more information. """ assert not start_index assert not cache assert not fetch_mentions assert not fetch_events # build query filter filter = { 'limit': count or 20, } if activity_id: if is_bech32(activity_id): activity_id = uri_to_id(activity_id) filter['ids'] = [activity_id] if user_id: if is_bech32(user_id): user_id = uri_to_id(user_id) filter['authors'] = [user_id] if search_query: filter['search'] = search_query events = [] # query for activities logger.debug(f'connecting to {self.relays[0]}') with connect(self.relays[0], open_timeout=HTTP_TIMEOUT, close_timeout=HTTP_TIMEOUT, ) as websocket: events = self.query(websocket, filter) event_ids = [e['id'] for e in events] # maps raw Nostr id to activity activities = {uri_to_id(a['id']): a for a in [to_as1(e) for e in events]} assert len(activities) == len(events) # query for replies/shares if event_ids and (fetch_replies or fetch_shares): for event in self.query(websocket, {'#e': event_ids}): obj = to_as1(event) if in_reply_to := obj.get('inReplyTo'): activity = activities.get(uri_to_id(in_reply_to)) if activity: replies = activity.setdefault('replies', { 'items': [], 'totalItems': 0, }) replies['items'].append(obj) replies['totalItems'] += 1 elif obj.get('verb') == 'share': activity = activities.get(uri_to_id(as1.get_object(obj).get('id'))) if activity: activity.setdefault('tags', []).append(obj) return self.make_activities_base_response(util.trim_nulls(activities.values()))
[docs] def query(self, websocket, filter): """Runs a Nostr ``REQ`` query on an open websocket. Sends the query, collects the responses, and closes the ``REQ`` subscription. If ``limit`` is not set on the filter, it defaults to 20. Args: websocket (websockets.sync.client.ClientConnection) filter (dict): NIP-01 ``REQ`` filter limit (int) Returns: list of dict: Nostr events """ limit = filter.setdefault('limit', 20) subscription = secrets.token_urlsafe(16) req = ['REQ', subscription, filter] try: logger.debug(f'{websocket.remote_address} <= {req}') websocket.send(json_dumps(req)) except ConnectionClosedOK as err: logger.warning(err) return [] events = [] try: while True: msg = websocket.recv(timeout=HTTP_TIMEOUT) logger.debug(f'{websocket.remote_address} => {msg}') resp = json_loads(msg) if resp[:3] == ['OK', subscription, False]: break elif resp[:2] == ['EVENT', subscription]: event = resp[2] if verify(event): events.append(event) else: logger.warning(f'Invalid signature for event {event.get("id")}') elif resp[0] == 'AUTH' and len(resp) >= 2: auth = ['AUTH', id_and_sign({ 'kind': KIND_AUTH, 'pubkey': self.hex_pubkey, 'content': '', 'tags': [ ['relay', f'wss://{websocket.remote_address[0]}/'], ['challenge', resp[1]], ], }, self.privkey)] logger.debug(f'{websocket.remote_address} <= {auth}') websocket.send(json_dumps(auth)) elif len(events) >= limit or resp[:2] == ['EOSE', subscription]: break close = ['CLOSE', subscription] logger.debug(f'{websocket.remote_address} <= {close}') websocket.send(json_dumps(close)) except ConnectionClosedOK as err: logger.warning(err) return events
[docs] def create(self, obj, include_link=OMIT_LINK, ignore_formatting=False): """Creates a new object: a post, comment, like, repost, etc. See :meth:`Source.create` docstring for details. """ assert self.privkey type = as1.object_type(obj) url = obj.get('url') is_reply = type == 'comment' or obj.get('inReplyTo') base_obj = self.base_object(obj) base_url = base_obj.get('url') prefer_content = type == 'note' or (base_url and is_reply) event = from_as1(obj, privkey=self.privkey) content = self._content_for_create( obj, ignore_formatting=ignore_formatting, prefer_name=not prefer_content) or '' if include_link == INCLUDE_LINK and url: content += '\n' + url event.setdefault('pubkey', self.hex_pubkey) event['id'] = id_for(event) event['content'] = content missing = (set(('content', 'created_at', 'kind', 'id', 'pubkey', 'tags')) - event.keys()) assert not missing, f'missing {missing}' logger.debug(f'connecting to {self.relays[0]}') with connect(self.relays[0], open_timeout=HTTP_TIMEOUT, close_timeout=HTTP_TIMEOUT, ) as websocket: create = ['EVENT', event] logger.debug(f'{websocket.remote_address} <= {create}') try: websocket.send(json_dumps(create)) msg = websocket.recv(timeout=HTTP_TIMEOUT) except ConnectionClosedOK as cc: logger.warning(cc) return resp = json_loads(msg) logger.debug(f'{websocket.remote_address} => {resp}') if resp[:3] == ['OK', event['id'], True]: return creation_result(event) logger.warning('relay rejected event!') return creation_result(error_plain=resp[-1], abort=True)
[docs] def delete(self, id): """Deletes a post. Args: id (str): bech32-encoded id of the event to delete """ return self.create({ 'objectType': 'activity', 'verb': 'delete', 'object': id, 'published': util.now(tz=timezone.utc).isoformat(), })
@classmethod def _postprocess_base_object(cls, obj): # don't mess with ids return obj