Source code for granary.farcaster

"""Farcaster.

https://farcaster.xyz/
https://github.com/farcasterxyz/protocol/blob/main/docs/SPECIFICATION.md
https://snapchain.farcaster.xyz/

TODO:
* from_as1 actor support. probably return a list of Message?
* rel-alternate links via "Social Attestations." so complicated :(
  https://github.com/farcasterxyz/protocol/discussions/199
* mapping FIDs <=> DNS domains. unclear whether/how much this is adopted
  https://github.com/farcasterxyz/protocol/discussions/106
* user location, from https://github.com/farcasterxyz/protocol/discussions/196
  (it's a geo:... URL string, https://tools.ietf.org/html/rfc5870, in
  USER_DATA_TYPE_LOCATION)
"""
from blake3 import blake3
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
  Ed25519PrivateKey,
  Ed25519PublicKey,
)
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
import copy
from datetime import datetime, timezone
from itertools import zip_longest
import logging
import mimetypes
import re
from urllib.parse import urlparse

import grpc
from oauth_dropins.webutil import util

from . import as1, source
from .generated.farcaster import rpc_pb2_grpc
from .generated.farcaster.request_response_pb2 import (
  FidRequest,
  MessagesResponse,
  ReactionsByFidRequest,
)
from .generated.farcaster.message_pb2 import (
  CastId,
  FARCASTER_NETWORK_MAINNET,
  HASH_SCHEME_BLAKE3,
  Message,
  MessageData,
  MESSAGE_TYPE_CAST_ADD,
  MESSAGE_TYPE_CAST_REMOVE,
  MESSAGE_TYPE_REACTION_ADD,
  MESSAGE_TYPE_REACTION_REMOVE,
  MESSAGE_TYPE_USER_DATA_ADD,
  REACTION_TYPE_LIKE,
  MESSAGE_TYPE_LINK_ADD,
  MESSAGE_TYPE_LINK_REMOVE,
  REACTION_TYPE_RECAST,
  SIGNATURE_SCHEME_ED25519,
  USER_DATA_TYPE_BANNER,
  USER_DATA_TYPE_BIO,
  USER_DATA_TYPE_DISPLAY,
  USER_DATA_TYPE_PFP,
  USER_DATA_TYPE_URL,
  USER_DATA_TYPE_USERNAME,
)

# Maps USER_DATA_TYPE_* constant to AS1 actor field name.
USER_DATA_TYPE_TO_AS1 = {
  USER_DATA_TYPE_DISPLAY:  'displayName',
  USER_DATA_TYPE_USERNAME: 'username',
  USER_DATA_TYPE_BIO:      'summary',
  USER_DATA_TYPE_PFP:      'image',
  USER_DATA_TYPE_BANNER:   'image',
  USER_DATA_TYPE_URL:      'url',
}

DEFAULT_SNAPCHAIN_HOST = 'crackle.farcaster.xyz'
DEFAULT_SNAPCHAIN_PORT = 3383

# farcaster:// URIs: https://github.com/farcasterxyz/protocol/discussions/123
# we only support:
# * farcaster://[fid]
# * farcaster://[fid]/0x[hash]
FARCASTER_URI_RE = re.compile(r'farcaster://(?P<fid>[0-9]+)(/0x(?P<hash>[0-9a-f]+))?')

# https://github.com/farcasterxyz/protocol/blob/main/docs/SPECIFICATION.md#hashing
BLAKE3_HASH_LENGTH_BYTES = 20

# https://github.com/farcasterxyz/protocol/blob/main/docs/SPECIFICATION.md#name-server
HANDLE_RE = re.compile(r'[a-z0-9][a-z0-9-]{0,15}')

logger = logging.getLogger(__name__)


[docs] def uri(fid, hash=None): """Generates and returns a ``farcaster://`` URI. https://github.com/farcasterxyz/protocol/discussions/123 Args: fid (int) hash (bytes) Returns: str """ assert util.is_int(fid) uri = f'farcaster://{fid}' if hash: assert isinstance(hash, bytes) uri += f'/0x{hash.hex()}' return uri
[docs] def deserialize(msg): """Deserializes and returns the ``MessageData`` for a given ``Message``. Prefers ``data_bytes`` over ``data`` per the Farcaster spec: https://github.com/farcasterxyz/protocol/blob/main/docs/SPECIFICATION.md#hashing https://github.com/farcasterxyz/protocol/discussions/87 Args: msg (message_pb2.Message) Returns: MessageData Raises: ValueError: if neither ``data`` nor ``data_bytes`` is set """ if msg.HasField('data_bytes'): data = MessageData() data.ParseFromString(msg.data_bytes) return data elif msg.HasField('data'): return msg.data else: raise ValueError(f'No data or data_bytes: {msg}')
[docs] def verify(msg): """Verifies a ``MessageData``'s hash and signature. Args: msg (message_pb2.Message) Raises: ValueError: if ``hash`` or ``signature`` are invalid or missing """ if not msg.hash or not msg.signature or not msg.signer: raise ValueError(f'Missing hash or signature or signer: {msg}') if msg.signature_scheme != SIGNATURE_SCHEME_ED25519: raise ValueError(f'Unknown signature scheme: {msg.signature_scheme}') if msg.HasField('data_bytes'): data_bytes = msg.data_bytes elif msg.HasField('data'): data_bytes = msg.data.SerializeToString() else: raise ValueError(f'No data or data_bytes: {msg}') hash = blake3(data_bytes).digest()[:BLAKE3_HASH_LENGTH_BYTES] if hash != msg.hash: raise ValueError(f'Hash mismatch: expected {hash.hex()}, got {msg.hash.hex()}') try: Ed25519PublicKey.from_public_bytes(msg.signer).verify(msg.signature, msg.hash) except InvalidSignature as e: raise ValueError(f'Signature verification failed: {e}') from e
[docs] def serialize_and_hash(msg): """Serializes ``MessageData`` into ``data_bytes`` and computes ``hash``. Args: msg (message_pb2.Message) """ msg.data_bytes = msg.data.SerializeToString() msg.hash = blake3(msg.data_bytes).digest()[:BLAKE3_HASH_LENGTH_BYTES] msg.hash_scheme = HASH_SCHEME_BLAKE3
[docs] def sign(msg, privkey): """Signs a ``Message`` and populates its ``signer`` and ``signature``. Args: msg (message_pb2.Message) privkey (Ed25519PrivateKey): private key to sign with """ msg.signature = privkey.sign(msg.hash) msg.signer = privkey.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) msg.signature_scheme = SIGNATURE_SCHEME_ED25519
[docs] def to_as1(msg): """Converts a Farcaster protobuf to an ActivityStreams 1 object or actor. Ids are farcaster:// URIs: https://github.com/farcasterxyz/protocol/discussions/123 Args: msg (message_pb2.Message or MessagesResponse): Farcaster Message protobuf or MessagesResponse (user data messages) Returns: dict: AS1 activity, object, or actor """ # actor, from GetUserDataByFid response if isinstance(msg, MessagesResponse): actor = {} fid = None for m in msg.messages: if (data := deserialize(m)) and data.type == MESSAGE_TYPE_USER_DATA_ADD: if not fid: fid = data.fid assert data.fid == fid body = data.user_data_body if field := USER_DATA_TYPE_TO_AS1.get(body.type): if field == 'image': img = ({'objectType': 'featured', 'url': body.value} if body.type == USER_DATA_TYPE_BANNER else body.value) actor.setdefault('image', []).append(img) elif field == 'url': actor['url'] = body.value if fid: actor['urls'] = [body.value, Farcaster.user_url(fid)] else: actor[field] = body.value return util.trim_nulls({ 'url': Farcaster.user_url(fid) if fid else None, # default **actor, 'objectType': 'person', 'id': uri(fid) if fid else None, }) # all other object types if not msg or not isinstance(msg, Message): return {} if not (data := deserialize(msg)): return {} obj = {} # AS1 return value actor_fid = data.fid msg_type = data.type published = None if data.timestamp: published = datetime.fromtimestamp(data.timestamp, tz=timezone.utc).isoformat() # post if msg_type == MESSAGE_TYPE_CAST_ADD: cast = data.cast_add_body obj.update({ 'objectType': 'note', 'content': cast.text, 'content_is_html': False, 'tags': [], 'image': [], 'attachments': [], 'published': published, }) if actor_fid: obj['author'] = uri(actor_fid) if msg.hash: obj.update({ 'id': uri(actor_fid, msg.hash), 'url': f'https://farcaster.xyz/~/conversations/0x{msg.hash.hex()}', }) if cast.mentions: for mention_fid, pos in zip_longest(cast.mentions, cast.mentions_positions): obj['tags'].append({ 'objectType': 'mention', 'url': uri(mention_fid), 'startIndex': pos, }) for embed in cast.embeds: if embed.HasField('url'): mimetype, _ = mimetypes.guess_type(embed.url) if mimetype and mimetype.startswith('image/'): obj['image'].append(embed.url) elif mimetype and mimetype.startswith('video/'): obj['attachments'].append({ 'objectType': 'video', 'stream': {'url': embed.url}, }) else: obj['attachments'].append({ 'objectType': 'link', 'url': embed.url, }) elif embed.HasField('cast_id'): obj['attachments'].append({ 'objectType': 'note', 'id': uri(embed.cast_id.fid, embed.cast_id.hash), 'author': uri(embed.cast_id.fid), }) if cast.HasField('parent_cast_id'): obj['inReplyTo'] = { 'id': uri(cast.parent_cast_id.fid, cast.parent_cast_id.hash), 'author': uri(cast.parent_cast_id.fid), } elif cast.HasField('parent_url'): obj['inReplyTo'] = cast.parent_url # delete elif msg_type == MESSAGE_TYPE_CAST_REMOVE: obj = { 'objectType': 'activity', 'verb': 'delete', 'actor': uri(actor_fid), 'object': uri(actor_fid, data.cast_remove_body.target_hash), 'published': published, } # like, repost elif msg_type in (MESSAGE_TYPE_REACTION_ADD, MESSAGE_TYPE_REACTION_REMOVE): reaction = data.reaction_body verb = 'like' if reaction.type == REACTION_TYPE_LIKE else 'share' if reaction.HasField('target_cast_id'): target_obj = { 'id': uri(reaction.target_cast_id.fid, reaction.target_cast_id.hash), 'author': uri(reaction.target_cast_id.fid), } elif reaction.HasField('target_url'): target_obj = reaction.target_url obj = { 'objectType': 'activity', 'verb': verb, 'actor': uri(actor_fid), 'object': target_obj, } if msg_type == MESSAGE_TYPE_REACTION_REMOVE: obj = { 'objectType': 'activity', 'verb': 'undo', 'actor': uri(actor_fid), 'object': obj, } obj['published'] = published if msg.hash: obj['id'] = uri(actor_fid, msg.hash) # follow, block, unfollow, unblock elif msg_type in (MESSAGE_TYPE_LINK_ADD, MESSAGE_TYPE_LINK_REMOVE): obj = { 'objectType': 'activity', 'verb': data.link_body.type, 'actor': uri(actor_fid), 'object': uri(data.link_body.target_fid), } if msg_type == MESSAGE_TYPE_LINK_REMOVE: obj = { 'objectType': 'activity', 'verb': 'undo', 'actor': uri(actor_fid), 'object': obj, } obj['published'] = published if timestamp := data.link_body.displayTimestamp: obj['published'] = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat() return util.trim_nulls(obj)
[docs] def from_as1(obj): """Converts an ActivityStreams 1 activity or object to a Farcaster Message. Args: obj (dict): AS1 activity or object Returns: message_pb2.Message: Farcaster Message protobuf """ obj = copy.deepcopy(obj) msg = Message() data = msg.data type = as1.object_type(obj) if type in ('post', 'update'): type = as1.object_type(as1.get_object(obj)) obj = as1.get_object(obj) inner_obj = as1.get_object(obj) inner_id = inner_obj.get('id') inner_id_match = FARCASTER_URI_RE.fullmatch(inner_id or '') inner_type = as1.object_type(inner_obj) author = as1.get_owner(obj) if (match := FARCASTER_URI_RE.fullmatch(author)) and not match['hash']: data.fid = int(match['fid']) data.network = FARCASTER_NETWORK_MAINNET published = (util.parse_iso8601(obj['published']) if obj.get('published') else util.now(tz=timezone.utc)) data.timestamp = int(published.timestamp()) as1.convert_html_content_to_text(obj) as1.expand_tags(obj) # posts if type in ('note', 'article', 'comment'): data.type = MESSAGE_TYPE_CAST_ADD cast = data.cast_add_body content = obj.get('content', '') cast.text = content # mentions mentions = [] mention_positions = [] for tag in as1.get_objects(obj, 'tags'): if tag.get('objectType') == 'mention': match = FARCASTER_URI_RE.fullmatch(tag.get('url', '')) if match and not match['hash']: cast.mentions.append(int(match['fid'])) if 'startIndex' in tag: cast.mentions_positions.append(tag['startIndex']) # images for img in as1.get_objects(obj, 'image'): if url := as1.get_url(img) or img.get('id'): cast.embeds.add().url = url for att in as1.get_objects(obj, 'attachments'): att_type = as1.object_type(att) if att_type == 'note': author = att.get('author', '') if (match := FARCASTER_URI_RE.fullmatch(att.get('id', ''))) and match['hash']: embed = cast.embeds.add() embed.cast_id.fid = int(match['fid']) embed.cast_id.hash = bytes.fromhex(match['hash']) elif att_type in ('video', 'audio'): if url := as1.get_url(util.get_first(att, 'stream')): cast.embeds.add().url = url elif att_type == 'link': if url := as1.get_url(att): cast.embeds.add().url = url # reply if ((in_reply_to := as1.get_object(obj, 'inReplyTo')) and (id := in_reply_to.get('id'))): if (match := FARCASTER_URI_RE.fullmatch(id)) and match['hash']: cast.parent_cast_id.fid = int(match['fid']) cast.parent_cast_id.hash = bytes.fromhex(match['hash']) elif util.is_web(id): cast.parent_url = id # likes/reposts elif type in ('like', 'share'): data.type = MESSAGE_TYPE_REACTION_ADD reaction = data.reaction_body reaction.type = REACTION_TYPE_LIKE if type == 'like' else REACTION_TYPE_RECAST if inner_id_match and inner_id_match['hash']: reaction.target_cast_id.fid = int(inner_id_match['fid']) reaction.target_cast_id.hash = bytes.fromhex(inner_id_match['hash']) elif util.is_web(inner_id): reaction.target_url = inner_id # follow, block elif type in ('follow', 'block'): data.type = MESSAGE_TYPE_LINK_ADD data.link_body.type = type if inner_id_match and not inner_id_match['hash']: data.link_body.target_fid = int(inner_id_match['fid']) data.link_body.displayTimestamp = data.timestamp # delete post elif type == 'delete': data.type = MESSAGE_TYPE_CAST_REMOVE if inner_id_match and inner_id_match['hash']: data.cast_remove_body.target_hash = bytes.fromhex(inner_id_match['hash']) # undo like/repost elif type == 'undo' and inner_type in ('like', 'share'): data.type = MESSAGE_TYPE_REACTION_REMOVE data.reaction_body.MergeFrom(from_as1(inner_obj).data.reaction_body) # unfollow, unblock elif type == 'undo' and inner_type in ('follow', 'block'): data.type = MESSAGE_TYPE_LINK_REMOVE data.link_body.type = inner_type target_id = as1.get_object(inner_obj).get('id', '') if (match := FARCASTER_URI_RE.fullmatch(target_id)) and not match['hash']: data.link_body.target_fid = int(match['fid']) data.link_body.displayTimestamp = data.timestamp serialize_and_hash(msg) return msg
[docs] class Farcaster(source.Source): """Farcaster source class. See file docstring and :class:`Source` for details. Attributes: _hub (rpc_pb2_grpc.HubServiceStub): gRPC client """ DOMAIN = 'farcaster.xyz' BASE_URL = 'https://farcaster.xyz/' NAME = 'Farcaster' def __init__(self, host=DEFAULT_SNAPCHAIN_HOST, port=DEFAULT_SNAPCHAIN_PORT): """Constructor. Args: host (str): snapchain node host, eg ``snapchain.farcaster.xyz`` port (int): snapchain node port, default 3383 """ assert host assert port addr = f'{host}:{port}' logger.info(f'Connecting to Farcaster Snapchain node {addr}') channel = grpc.secure_channel(addr, grpc.ssl_channel_credentials()) self._hub = rpc_pb2_grpc.HubServiceStub(channel)
[docs] @classmethod def user_url(cls, fid): """Returns the Farcaster URL for a user with the given FID. https://docs.farcaster.xyz/reference/farcaster/intent-urls#resource-urls Args: fid (int): Farcaster user ID Returns: str: URL """ return f'https://farcaster.xyz/~/profiles/{fid}'
[docs] def get_actor(self, fid): """Fetches and returns a Farcaster user as an AS1 actor dict. Args: fid (int): Farcaster user ID Returns: dict: AS1 actor """ resp = self._hub.GetUserDataByFid(FidRequest(fid=fid)) return to_as1(resp)
[docs] def get_activities_response(self, user_id=None, group_id=None, app_id=None, activity_id=None, fetch_replies=False, fetch_likes=False, fetch_shares=False, include_shares=True, fetch_events=False, fetch_mentions=False, search_query=None, start_index=0, count=0, **kwargs): """Fetches casts and reactions and returns them as AS1 activities. See :meth:`Source.get_activities_response` for details. ``group_id``, ``fetch_replies``, ``fetch_events``, and ``search_query`` are not yet supported. ``user_id`` is a Farcaster FID (integer or string). """ if user_id and not util.is_int(user_id): raise ValueError(f'user_id must be a Farcaster FID (integer), got {user_id!r}') elif activity_id and not user_id: raise ValueError('activity_id requires user_id') fid = int(user_id) if user_id else None page_kwargs = {'reverse': True} if count: page_kwargs['page_size'] = count activities = [] def add_activities(resp): for msg in resp.messages: obj = to_as1(msg) if obj.get('objectType') != 'activity': obj = { 'objectType': 'activity', 'verb': 'post', 'object': obj, 'actor': obj.get('author'), } activities.append(self.postprocess_activity(obj)) if fid: if activity_id: cast_hash = bytes.fromhex(activity_id.removeprefix('farcaster:cast:')) add_activities(MessagesResponse( messages=[self._hub.GetCast(CastId(fid=fid, hash=cast_hash))])) else: add_activities(self._hub.GetCastsByFid(FidRequest(fid=fid, **page_kwargs))) if fetch_mentions: add_activities(self._hub.GetCastsByMention(FidRequest(fid=fid, **page_kwargs))) if fetch_likes: add_activities(self._hub.GetReactionsByFid(ReactionsByFidRequest( fid=fid, reaction_type=REACTION_TYPE_LIKE, **page_kwargs))) if fetch_shares: add_activities(self._hub.GetReactionsByFid( ReactionsByFidRequest( fid=fid, reaction_type=REACTION_TYPE_RECAST, **page_kwargs))) return self.make_activities_base_response( activities, activity_id=activity_id, start_index=start_index)