"""Farcaster.
https://farcaster.xyz/
https://github.com/farcasterxyz/protocol/blob/main/docs/SPECIFICATION.md
https://snapchain.farcaster.xyz/
TODO:
* from_as1 actor support. probably return a list of Message?
* rel-alternate links via "Social Attestations." so complicated :(
https://github.com/farcasterxyz/protocol/discussions/199
* mapping FIDs <=> DNS domains. unclear whether/how much this is adopted
https://github.com/farcasterxyz/protocol/discussions/106
* user location, from https://github.com/farcasterxyz/protocol/discussions/196
(it's a geo:... URL string, https://tools.ietf.org/html/rfc5870, in
USER_DATA_TYPE_LOCATION)
"""
from blake3 import blake3
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives.asymmetric.ed25519 import (
Ed25519PrivateKey,
Ed25519PublicKey,
)
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
import copy
from datetime import datetime, timezone
from itertools import zip_longest
import logging
import mimetypes
import re
from urllib.parse import urlparse
import grpc
from oauth_dropins.webutil import util
from . import as1, source
from .generated.farcaster import rpc_pb2_grpc
from .generated.farcaster.request_response_pb2 import (
FidRequest,
MessagesResponse,
ReactionsByFidRequest,
)
from .generated.farcaster.message_pb2 import (
CastId,
FARCASTER_NETWORK_MAINNET,
HASH_SCHEME_BLAKE3,
Message,
MessageData,
MESSAGE_TYPE_CAST_ADD,
MESSAGE_TYPE_CAST_REMOVE,
MESSAGE_TYPE_REACTION_ADD,
MESSAGE_TYPE_REACTION_REMOVE,
MESSAGE_TYPE_USER_DATA_ADD,
REACTION_TYPE_LIKE,
MESSAGE_TYPE_LINK_ADD,
MESSAGE_TYPE_LINK_REMOVE,
REACTION_TYPE_RECAST,
SIGNATURE_SCHEME_ED25519,
USER_DATA_TYPE_BANNER,
USER_DATA_TYPE_BIO,
USER_DATA_TYPE_DISPLAY,
USER_DATA_TYPE_PFP,
USER_DATA_TYPE_URL,
USER_DATA_TYPE_USERNAME,
)
# Maps USER_DATA_TYPE_* constant to AS1 actor field name.
USER_DATA_TYPE_TO_AS1 = {
USER_DATA_TYPE_DISPLAY: 'displayName',
USER_DATA_TYPE_USERNAME: 'username',
USER_DATA_TYPE_BIO: 'summary',
USER_DATA_TYPE_PFP: 'image',
USER_DATA_TYPE_BANNER: 'image',
USER_DATA_TYPE_URL: 'url',
}
DEFAULT_SNAPCHAIN_HOST = 'crackle.farcaster.xyz'
DEFAULT_SNAPCHAIN_PORT = 3383
# farcaster:// URIs: https://github.com/farcasterxyz/protocol/discussions/123
# we only support:
# * farcaster://[fid]
# * farcaster://[fid]/0x[hash]
FARCASTER_URI_RE = re.compile(r'farcaster://(?P<fid>[0-9]+)(/0x(?P<hash>[0-9a-f]+))?')
# https://github.com/farcasterxyz/protocol/blob/main/docs/SPECIFICATION.md#hashing
BLAKE3_HASH_LENGTH_BYTES = 20
# https://github.com/farcasterxyz/protocol/blob/main/docs/SPECIFICATION.md#name-server
HANDLE_RE = re.compile(r'[a-z0-9][a-z0-9-]{0,15}')
logger = logging.getLogger(__name__)
[docs]
def uri(fid, hash=None):
"""Generates and returns a ``farcaster://`` URI.
https://github.com/farcasterxyz/protocol/discussions/123
Args:
fid (int)
hash (bytes)
Returns:
str
"""
assert util.is_int(fid)
uri = f'farcaster://{fid}'
if hash:
assert isinstance(hash, bytes)
uri += f'/0x{hash.hex()}'
return uri
[docs]
def deserialize(msg):
"""Deserializes and returns the ``MessageData`` for a given ``Message``.
Prefers ``data_bytes`` over ``data`` per the Farcaster spec:
https://github.com/farcasterxyz/protocol/blob/main/docs/SPECIFICATION.md#hashing
https://github.com/farcasterxyz/protocol/discussions/87
Args:
msg (message_pb2.Message)
Returns:
MessageData
Raises:
ValueError: if neither ``data`` nor ``data_bytes`` is set
"""
if msg.HasField('data_bytes'):
data = MessageData()
data.ParseFromString(msg.data_bytes)
return data
elif msg.HasField('data'):
return msg.data
else:
raise ValueError(f'No data or data_bytes: {msg}')
[docs]
def verify(msg):
"""Verifies a ``MessageData``'s hash and signature.
Args:
msg (message_pb2.Message)
Raises:
ValueError: if ``hash`` or ``signature`` are invalid or missing
"""
if not msg.hash or not msg.signature or not msg.signer:
raise ValueError(f'Missing hash or signature or signer: {msg}')
if msg.signature_scheme != SIGNATURE_SCHEME_ED25519:
raise ValueError(f'Unknown signature scheme: {msg.signature_scheme}')
if msg.HasField('data_bytes'):
data_bytes = msg.data_bytes
elif msg.HasField('data'):
data_bytes = msg.data.SerializeToString()
else:
raise ValueError(f'No data or data_bytes: {msg}')
hash = blake3(data_bytes).digest()[:BLAKE3_HASH_LENGTH_BYTES]
if hash != msg.hash:
raise ValueError(f'Hash mismatch: expected {hash.hex()}, got {msg.hash.hex()}')
try:
Ed25519PublicKey.from_public_bytes(msg.signer).verify(msg.signature, msg.hash)
except InvalidSignature as e:
raise ValueError(f'Signature verification failed: {e}') from e
[docs]
def serialize_and_hash(msg):
"""Serializes ``MessageData`` into ``data_bytes`` and computes ``hash``.
Args:
msg (message_pb2.Message)
"""
msg.data_bytes = msg.data.SerializeToString()
msg.hash = blake3(msg.data_bytes).digest()[:BLAKE3_HASH_LENGTH_BYTES]
msg.hash_scheme = HASH_SCHEME_BLAKE3
[docs]
def sign(msg, privkey):
"""Signs a ``Message`` and populates its ``signer`` and ``signature``.
Args:
msg (message_pb2.Message)
privkey (Ed25519PrivateKey): private key to sign with
"""
msg.signature = privkey.sign(msg.hash)
msg.signer = privkey.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
msg.signature_scheme = SIGNATURE_SCHEME_ED25519
[docs]
def to_as1(msg):
"""Converts a Farcaster protobuf to an ActivityStreams 1 object or actor.
Ids are farcaster:// URIs: https://github.com/farcasterxyz/protocol/discussions/123
Args:
msg (message_pb2.Message or MessagesResponse):
Farcaster Message protobuf or MessagesResponse (user data messages)
Returns:
dict: AS1 activity, object, or actor
"""
# actor, from GetUserDataByFid response
if isinstance(msg, MessagesResponse):
actor = {}
fid = None
for m in msg.messages:
if (data := deserialize(m)) and data.type == MESSAGE_TYPE_USER_DATA_ADD:
if not fid:
fid = data.fid
assert data.fid == fid
body = data.user_data_body
if field := USER_DATA_TYPE_TO_AS1.get(body.type):
if field == 'image':
img = ({'objectType': 'featured', 'url': body.value}
if body.type == USER_DATA_TYPE_BANNER
else body.value)
actor.setdefault('image', []).append(img)
elif field == 'url':
actor['url'] = body.value
if fid:
actor['urls'] = [body.value, Farcaster.user_url(fid)]
else:
actor[field] = body.value
return util.trim_nulls({
'url': Farcaster.user_url(fid) if fid else None, # default
**actor,
'objectType': 'person',
'id': uri(fid) if fid else None,
})
# all other object types
if not msg or not isinstance(msg, Message):
return {}
if not (data := deserialize(msg)):
return {}
obj = {} # AS1 return value
actor_fid = data.fid
msg_type = data.type
published = None
if data.timestamp:
published = datetime.fromtimestamp(data.timestamp, tz=timezone.utc).isoformat()
# post
if msg_type == MESSAGE_TYPE_CAST_ADD:
cast = data.cast_add_body
obj.update({
'objectType': 'note',
'content': cast.text,
'content_is_html': False,
'tags': [],
'image': [],
'attachments': [],
'published': published,
})
if actor_fid:
obj['author'] = uri(actor_fid)
if msg.hash:
obj.update({
'id': uri(actor_fid, msg.hash),
'url': f'https://farcaster.xyz/~/conversations/0x{msg.hash.hex()}',
})
if cast.mentions:
for mention_fid, pos in zip_longest(cast.mentions, cast.mentions_positions):
obj['tags'].append({
'objectType': 'mention',
'url': uri(mention_fid),
'startIndex': pos,
})
for embed in cast.embeds:
if embed.HasField('url'):
mimetype, _ = mimetypes.guess_type(embed.url)
if mimetype and mimetype.startswith('image/'):
obj['image'].append(embed.url)
elif mimetype and mimetype.startswith('video/'):
obj['attachments'].append({
'objectType': 'video',
'stream': {'url': embed.url},
})
else:
obj['attachments'].append({
'objectType': 'link',
'url': embed.url,
})
elif embed.HasField('cast_id'):
obj['attachments'].append({
'objectType': 'note',
'id': uri(embed.cast_id.fid, embed.cast_id.hash),
'author': uri(embed.cast_id.fid),
})
if cast.HasField('parent_cast_id'):
obj['inReplyTo'] = {
'id': uri(cast.parent_cast_id.fid, cast.parent_cast_id.hash),
'author': uri(cast.parent_cast_id.fid),
}
elif cast.HasField('parent_url'):
obj['inReplyTo'] = cast.parent_url
# delete
elif msg_type == MESSAGE_TYPE_CAST_REMOVE:
obj = {
'objectType': 'activity',
'verb': 'delete',
'actor': uri(actor_fid),
'object': uri(actor_fid, data.cast_remove_body.target_hash),
'published': published,
}
# like, repost
elif msg_type in (MESSAGE_TYPE_REACTION_ADD, MESSAGE_TYPE_REACTION_REMOVE):
reaction = data.reaction_body
verb = 'like' if reaction.type == REACTION_TYPE_LIKE else 'share'
if reaction.HasField('target_cast_id'):
target_obj = {
'id': uri(reaction.target_cast_id.fid, reaction.target_cast_id.hash),
'author': uri(reaction.target_cast_id.fid),
}
elif reaction.HasField('target_url'):
target_obj = reaction.target_url
obj = {
'objectType': 'activity',
'verb': verb,
'actor': uri(actor_fid),
'object': target_obj,
}
if msg_type == MESSAGE_TYPE_REACTION_REMOVE:
obj = {
'objectType': 'activity',
'verb': 'undo',
'actor': uri(actor_fid),
'object': obj,
}
obj['published'] = published
if msg.hash:
obj['id'] = uri(actor_fid, msg.hash)
# follow, block, unfollow, unblock
elif msg_type in (MESSAGE_TYPE_LINK_ADD, MESSAGE_TYPE_LINK_REMOVE):
obj = {
'objectType': 'activity',
'verb': data.link_body.type,
'actor': uri(actor_fid),
'object': uri(data.link_body.target_fid),
}
if msg_type == MESSAGE_TYPE_LINK_REMOVE:
obj = {
'objectType': 'activity',
'verb': 'undo',
'actor': uri(actor_fid),
'object': obj,
}
obj['published'] = published
if timestamp := data.link_body.displayTimestamp:
obj['published'] = datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
return util.trim_nulls(obj)
[docs]
def from_as1(obj):
"""Converts an ActivityStreams 1 activity or object to a Farcaster Message.
Args:
obj (dict): AS1 activity or object
Returns:
message_pb2.Message: Farcaster Message protobuf
"""
obj = copy.deepcopy(obj)
msg = Message()
data = msg.data
type = as1.object_type(obj)
if type in ('post', 'update'):
type = as1.object_type(as1.get_object(obj))
obj = as1.get_object(obj)
inner_obj = as1.get_object(obj)
inner_id = inner_obj.get('id')
inner_id_match = FARCASTER_URI_RE.fullmatch(inner_id or '')
inner_type = as1.object_type(inner_obj)
author = as1.get_owner(obj)
if (match := FARCASTER_URI_RE.fullmatch(author)) and not match['hash']:
data.fid = int(match['fid'])
data.network = FARCASTER_NETWORK_MAINNET
published = (util.parse_iso8601(obj['published']) if obj.get('published')
else util.now(tz=timezone.utc))
data.timestamp = int(published.timestamp())
as1.convert_html_content_to_text(obj)
as1.expand_tags(obj)
# posts
if type in ('note', 'article', 'comment'):
data.type = MESSAGE_TYPE_CAST_ADD
cast = data.cast_add_body
content = obj.get('content', '')
cast.text = content
# mentions
mentions = []
mention_positions = []
for tag in as1.get_objects(obj, 'tags'):
if tag.get('objectType') == 'mention':
match = FARCASTER_URI_RE.fullmatch(tag.get('url', ''))
if match and not match['hash']:
cast.mentions.append(int(match['fid']))
if 'startIndex' in tag:
cast.mentions_positions.append(tag['startIndex'])
# images
for img in as1.get_objects(obj, 'image'):
if url := as1.get_url(img) or img.get('id'):
cast.embeds.add().url = url
for att in as1.get_objects(obj, 'attachments'):
att_type = as1.object_type(att)
if att_type == 'note':
author = att.get('author', '')
if (match := FARCASTER_URI_RE.fullmatch(att.get('id', ''))) and match['hash']:
embed = cast.embeds.add()
embed.cast_id.fid = int(match['fid'])
embed.cast_id.hash = bytes.fromhex(match['hash'])
elif att_type in ('video', 'audio'):
if url := as1.get_url(util.get_first(att, 'stream')):
cast.embeds.add().url = url
elif att_type == 'link':
if url := as1.get_url(att):
cast.embeds.add().url = url
# reply
if ((in_reply_to := as1.get_object(obj, 'inReplyTo'))
and (id := in_reply_to.get('id'))):
if (match := FARCASTER_URI_RE.fullmatch(id)) and match['hash']:
cast.parent_cast_id.fid = int(match['fid'])
cast.parent_cast_id.hash = bytes.fromhex(match['hash'])
elif util.is_web(id):
cast.parent_url = id
# likes/reposts
elif type in ('like', 'share'):
data.type = MESSAGE_TYPE_REACTION_ADD
reaction = data.reaction_body
reaction.type = REACTION_TYPE_LIKE if type == 'like' else REACTION_TYPE_RECAST
if inner_id_match and inner_id_match['hash']:
reaction.target_cast_id.fid = int(inner_id_match['fid'])
reaction.target_cast_id.hash = bytes.fromhex(inner_id_match['hash'])
elif util.is_web(inner_id):
reaction.target_url = inner_id
# follow, block
elif type in ('follow', 'block'):
data.type = MESSAGE_TYPE_LINK_ADD
data.link_body.type = type
if inner_id_match and not inner_id_match['hash']:
data.link_body.target_fid = int(inner_id_match['fid'])
data.link_body.displayTimestamp = data.timestamp
# delete post
elif type == 'delete':
data.type = MESSAGE_TYPE_CAST_REMOVE
if inner_id_match and inner_id_match['hash']:
data.cast_remove_body.target_hash = bytes.fromhex(inner_id_match['hash'])
# undo like/repost
elif type == 'undo' and inner_type in ('like', 'share'):
data.type = MESSAGE_TYPE_REACTION_REMOVE
data.reaction_body.MergeFrom(from_as1(inner_obj).data.reaction_body)
# unfollow, unblock
elif type == 'undo' and inner_type in ('follow', 'block'):
data.type = MESSAGE_TYPE_LINK_REMOVE
data.link_body.type = inner_type
target_id = as1.get_object(inner_obj).get('id', '')
if (match := FARCASTER_URI_RE.fullmatch(target_id)) and not match['hash']:
data.link_body.target_fid = int(match['fid'])
data.link_body.displayTimestamp = data.timestamp
serialize_and_hash(msg)
return msg
[docs]
class Farcaster(source.Source):
"""Farcaster source class. See file docstring and :class:`Source` for details.
Attributes:
_hub (rpc_pb2_grpc.HubServiceStub): gRPC client
"""
DOMAIN = 'farcaster.xyz'
BASE_URL = 'https://farcaster.xyz/'
NAME = 'Farcaster'
def __init__(self, host=DEFAULT_SNAPCHAIN_HOST, port=DEFAULT_SNAPCHAIN_PORT):
"""Constructor.
Args:
host (str): snapchain node host, eg ``snapchain.farcaster.xyz``
port (int): snapchain node port, default 3383
"""
assert host
assert port
addr = f'{host}:{port}'
logger.info(f'Connecting to Farcaster Snapchain node {addr}')
channel = grpc.secure_channel(addr, grpc.ssl_channel_credentials())
self._hub = rpc_pb2_grpc.HubServiceStub(channel)
[docs]
@classmethod
def user_url(cls, fid):
"""Returns the Farcaster URL for a user with the given FID.
https://docs.farcaster.xyz/reference/farcaster/intent-urls#resource-urls
Args:
fid (int): Farcaster user ID
Returns:
str: URL
"""
return f'https://farcaster.xyz/~/profiles/{fid}'
[docs]
def get_actor(self, fid):
"""Fetches and returns a Farcaster user as an AS1 actor dict.
Args:
fid (int): Farcaster user ID
Returns:
dict: AS1 actor
"""
resp = self._hub.GetUserDataByFid(FidRequest(fid=fid))
return to_as1(resp)
[docs]
def get_activities_response(self, user_id=None, group_id=None, app_id=None,
activity_id=None, fetch_replies=False,
fetch_likes=False, fetch_shares=False,
include_shares=True, fetch_events=False,
fetch_mentions=False, search_query=None,
start_index=0, count=0, **kwargs):
"""Fetches casts and reactions and returns them as AS1 activities.
See :meth:`Source.get_activities_response` for details. ``group_id``,
``fetch_replies``, ``fetch_events``, and ``search_query`` are not yet supported.
``user_id`` is a Farcaster FID (integer or string).
"""
if user_id and not util.is_int(user_id):
raise ValueError(f'user_id must be a Farcaster FID (integer), got {user_id!r}')
elif activity_id and not user_id:
raise ValueError('activity_id requires user_id')
fid = int(user_id) if user_id else None
page_kwargs = {'reverse': True}
if count:
page_kwargs['page_size'] = count
activities = []
def add_activities(resp):
for msg in resp.messages:
obj = to_as1(msg)
if obj.get('objectType') != 'activity':
obj = {
'objectType': 'activity',
'verb': 'post',
'object': obj,
'actor': obj.get('author'),
}
activities.append(self.postprocess_activity(obj))
if fid:
if activity_id:
cast_hash = bytes.fromhex(activity_id.removeprefix('farcaster:cast:'))
add_activities(MessagesResponse(
messages=[self._hub.GetCast(CastId(fid=fid, hash=cast_hash))]))
else:
add_activities(self._hub.GetCastsByFid(FidRequest(fid=fid, **page_kwargs)))
if fetch_mentions:
add_activities(self._hub.GetCastsByMention(FidRequest(fid=fid, **page_kwargs)))
if fetch_likes:
add_activities(self._hub.GetReactionsByFid(ReactionsByFidRequest(
fid=fid, reaction_type=REACTION_TYPE_LIKE, **page_kwargs)))
if fetch_shares:
add_activities(self._hub.GetReactionsByFid(
ReactionsByFidRequest(
fid=fid, reaction_type=REACTION_TYPE_RECAST, **page_kwargs)))
return self.make_activities_base_response(
activities, activity_id=activity_id, start_index=start_index)