Source code for granary.bluesky

"""Bluesky source class.

import copy
from datetime import datetime, timezone
import json
import logging
import re
from pathlib import Path
import urllib.parse

import requests

from lexrpc import Client
from lexrpc.base import NSID_RE
from oauth_dropins.webutil import util
from oauth_dropins.webutil.util import trim_nulls

from . import as1
from .source import FRIENDS, html_to_text, Source, OMIT_LINK, creation_result

logger = logging.getLogger(__name__)

# via
HANDLE_PATTERN = re.compile(r'^' + HANDLE_REGEX)
DID_WEB_PATTERN = re.compile(r'^did:web:' + HANDLE_REGEX)

# at:// URI regexp
# also see arroba.util.parse_at_uri
_CHARS = 'a-zA-Z0-9-.:'
# TODO: add query and fragment? they're currently unused in the protocol
AT_URI_PATTERN = re.compile(rf"""
    $""", re.VERBOSE)

# Maps AT Protocol NSID collections to path elements in URLs.
# Used in at_uri_to_web_url.
# eg for mapping a URI like:
#   at://did:plc:z72i7hd/app.bsky.feed.generator/mutuals
# to a frontend URL like:
  'app.bsky.feed.generator': 'feed',
  '': 'post',
  'app.bsky.graph.list': 'lists',
  name: coll for coll, name in COLLECTION_TO_BSKY_APP_TYPE.items()

# TODO: load from app.bsky.embed.images lexicon

# maps AS1 objectType/verb to possible output Bluesky lexicon types.
# used in from_as1
POST_TYPES = tuple(as1.POST_TYPES) + ('bookmark',)
  as1.ACTOR_TYPES: (
  'share': (

BSKY_APP_URL_RE = re.compile(r"""
  """, re.VERBOSE)


# label on profiles set to only show them to logged in users
NO_AUTHENTICATED_LABEL = '!no-unauthenticated'

[docs] def url_to_did_web(url): """Converts a URL to a ``did:web``. In AT Proto, only hostname-based web DIDs are supported. Paths are not supported, and will be discarded. Examples: * ```` => ```` * ```` => ```` * ```` => ```` Args: url (str) Returns: str: """ parsed = urllib.parse.urlparse(url) if not parsed.hostname: raise ValueError(f'Invalid URL: {url}') if parsed.netloc != parsed.hostname: logger.warning(f"URL {url} contained a port, which will not be included in the DID.") if parsed.path and parsed.path != "/": logger.warning(f"URL {url} contained a path, which will not be included in the DID.") return f'did:web:{parsed.hostname}'
[docs] def did_web_to_url(did): """Converts a did:web to a URL. In AT Proto, only hostname-based web DIDs are supported. Paths are not supported, and will throw an invalid error. Examples: * ```` => ```` * ```` => INVALID * ```` => INVALID Args: did (str) Returns: str: """ if not did or not DID_WEB_PATTERN.match(did): raise ValueError(f'Invalid did:web: {did}') host = did.removeprefix('did:web:') host = urllib.parse.unquote(host) return f'https://{host}/'
[docs] def at_uri_to_web_url(uri, handle=None): """Converts an ``at://`` URI to a ```` URL. Args: uri (str): ``at://`` URI handle: (str): optional user handle. If not provided, defaults to the DID in uri. Returns: str: ```` URL, or None Raises: ValueError: if uri is not a string or doesn't start with ``at://`` """ if not uri: return None if not uri.startswith('at://'): raise ValueError(f'Expected at:// URI, got {uri}') parsed = urllib.parse.urlparse(uri) did = parsed.netloc if not parsed.path: return f'{Bluesky.user_url(handle or did)}' collection, tid = parsed.path.strip('/').split('/') type = COLLECTION_TO_BSKY_APP_TYPE.get(collection) if not type: return None return f'{Bluesky.user_url(handle or did)}/{type}/{tid}'
[docs] def web_url_to_at_uri(url, handle=None, did=None): """Converts a ```` URL to an ``at://`` URI. Currently supports profile, post, and feed URLs with DIDs and handles, eg: * ```` * ```` * ```` If both ``handle`` and ``did`` are provided, and ``handle`` matches the URL, the handle in the resulting URI will be replaced with ``did``. Args: url (str): ```` URL handle (str): Bluesky handle, or None did (str): Valid DID, or None Returns: str: ``at://`` URI, or None Raises: ValueError: if ``url`` can't be parsed as a ```` profile or post URL """ if not url: return None match = BSKY_APP_URL_RE.match(url) if not match: raise ValueError(f"{url} doesn't look like a profile or post URL") id ='id') assert id # If a did and handle have been provided explicitly, # replace the existing handle with the did. if did and handle and id == handle: id = did rkey ='tid') type ='type') if type: collection = BSKY_APP_TYPE_TO_COLLECTION[type] assert rkey else: collection = '' rkey = 'self' return f'at://{id}/{collection}/{rkey}'
[docs] def from_as1_to_strong_ref(obj, client=None, value=False): """Converts an AS1 object to an ATProto ``com.atproto.repo.strongRef``. Uses AS1 ``id`` or ``url`, which should be an ``at://`` URI. Args: obj (dict): AS1 object or activity client (lexrpc.Client): optional; if provided, this will be used to make API calls to PDSes to fetch and populate the ``cid`` field and resolve handle to DID. value (bool): whether to include the record's ``value`` field in the returned object Returns: dict: ATProto ``com.atproto.repo.strongRef`` record """ at_uri = Bluesky.post_id((obj.get('id') or as1.get_url(obj)) if isinstance(obj, dict) else obj ) or '' match = AT_URI_PATTERN.match(at_uri) if not match or not client: return { 'uri': at_uri, 'cid': '', } repo, collection, rkey = match.groups() if not repo.startswith('did:'): handle = repo repo =['did'] # only replace first instance of handle in case it's also in collection or rkey at_uri = at_uri.replace(handle, repo, 1) record = repo=repo, collection=collection, rkey=rkey) if not value: record.pop('value', None) return record
[docs] def from_as1_datetime(val): """Converts an AS1 RFC 3339 datetime string to ATProto ISO 8601. Bluesky requires full date and time with time zone, recommends UTC with Z suffix, fractional seconds. Returns now (ie the current time) if the input datetime can't be parsed. Args: val (str): RFC 3339 datetime Returns: str: ATProto compatible ISO 8601 datetime """ try: # dt = datetime.fromisoformat(val.strip()) dt = util.parse_iso8601(val.strip()) except (AttributeError, TypeError, ValueError): logging.debug(f"Couldn't parse {val} as ISO 8601; defaulting to current time") dt = if dt.tzinfo: dt = util.as_utc(dt) # else it's naive, assume it's UTC assert dt.utcoffset() is None, dt.utcoffset() return dt.isoformat(sep='T', timespec='milliseconds') + 'Z'
[docs] def base_object(obj): """Returns the "base" Bluesky object that an object operates on. If the object is a reply, repost, or like of a Bluesky post, this returns that post object. The id in the returned object is the AT protocol URI, while the URL is the web URL. Args: obj (dict): ActivityStreams object Returns: dict: minimal ActivityStreams object. Usually has at least ``id``; may also have ``url``, ``author``, etc. """ for field in ('inReplyTo', 'object', 'target'): for base in util.get_list(obj, field): url = util.get_url(base) if not url: return {} if url.startswith(''): return { 'id': web_url_to_at_uri(url), 'url': url, } if url.startswith('at://'): return { 'id': url, 'url': at_uri_to_web_url(url), } return {}
[docs] def from_as1(obj, out_type=None, blobs=None, client=None): """Converts an AS1 object to a Bluesky object. Converts to ``record`` types by default, eg ```` or ````. Use ``out_type`` to convert to a different type, eg ```` or ``app.bsky.feed.defs#feedViewPost``. The ``objectType`` field is required. Args: obj (dict): AS1 object or activity out_type (str): desired output lexicon ``$type`` blobs (dict): optional mapping from str URL to ``blob`` dict to use in the returned object. If not provided, or if this doesn't have an ``image`` or similar URL in the input object, its output blob will be omitted. client (Bluesky or lexrpc.Client): optional; if provided, this will be used to make API calls to PDSes to fetch and populate CIDs for records referenced by replies, likes, reposts, etc. Returns: dict: ``app.bsky.*`` object Raises: ValueError: if the ``objectType`` or ``verb`` fields are missing or unsupported """ if isinstance(client, Bluesky): client = client._client activity = obj inner_obj = as1.get_object(activity) verb = activity.get('verb') or 'post' if inner_obj and verb == 'post': obj = inner_obj type = as1.object_type(obj) if not type: raise ValueError(f"Missing objectType or verb") actor = as1.get_object(activity, 'actor') if blobs is None: blobs = {} # validate out_type if out_type: for in_types, out_types in FROM_AS1_TYPES.items(): if type in in_types and out_type in out_types: break else: raise ValueError(f"{type} {verb} doesn't support out_type {out_type}") # extract @-mention links in HTML text obj = copy.deepcopy(obj) Source.postprocess_object(obj, mentions=True) # TODO: once we're on Python 3.10, switch this to a match statement! if type in as1.ACTOR_TYPES: # avatar and banner. banner is featured image, if available avatar = util.get_url(obj, 'image') banner = None for img in util.get_list(obj, 'image'): url = img.get('url') if img.get('objectType') == 'featured' and url: banner = url break ret = { 'displayName': obj.get('displayName'), 'description': html_to_text(obj.get('summary')), 'avatar': blobs.get(avatar), 'banner': blobs.get(banner), } if not out_type or out_type == '': return trim_nulls({**ret, '$type': ''}) url = as1.get_url(obj) id = obj.get('id') if not url and id: parsed = util.parse_tag_uri(id) if parsed: # This is only really formatted as a URL to keep url_to_did_web happy. url = f'https://{parsed[0]}' did_web = '' if id and id.startswith('did:web:'): did_web = id else: try: did_web = url_to_did_web(url) except ValueError as e:"Couldn't generate did:web: {e}") # handles must be hostnames # username = obj.get('username') parsed = urllib.parse.urlparse(url) domain = parsed.netloc did_web_bare = did_web.removeprefix('did:web:') handle = (username if username and HANDLE_PATTERN.match(username) else did_web_bare if ':' not in did_web_bare else domain if domain else '') ret.update({ '$type': out_type, # TODO: more specific than domain, many users will be on shared domains 'did': id if id and id.startswith('did:') else did_web, 'handle': handle, 'avatar': avatar, 'banner': banner, }) # WARNING: this includes description, which isn't technically in this # #profileViewBasic. hopefully clients should just ignore it! # return trim_nulls(ret, ignore=('did', 'handle')) elif type == 'share': if not out_type or out_type == 'app.bsky.feed.repost': return { '$type': 'app.bsky.feed.repost', 'subject': from_as1_to_strong_ref(inner_obj, client=client), 'createdAt': from_as1_datetime(obj.get('published')), } elif out_type == 'app.bsky.feed.defs#reasonRepost': return { '$type': 'app.bsky.feed.defs#reasonRepost', 'by': from_as1(actor, out_type=''), 'indexedAt': from_as1_datetime(None), } elif out_type == 'app.bsky.feed.defs#feedViewPost': return { '$type': 'app.bsky.feed.defs#feedViewPost', 'post': from_as1(inner_obj, out_type='app.bsky.feed.defs#postView'), 'reason': from_as1(obj, out_type='app.bsky.feed.defs#reasonRepost'), } elif type == 'like': return { '$type': '', 'subject': from_as1_to_strong_ref(inner_obj, client=client), 'createdAt': from_as1_datetime(obj.get('published')), } elif type == 'follow': if not inner_obj: raise ValueError('follow activity requires actor and object') return { '$type': 'app.bsky.graph.follow', 'subject': inner_obj.get('id'), # DID 'createdAt': from_as1_datetime(obj.get('published')), } elif verb == 'post' and type in POST_TYPES: # convert text from HTML and truncate src = Bluesky('unused') content = obj.get('content') text = obj.get('summary') or content or obj.get('name') or '' text = src.truncate(html_to_text(text), None, OMIT_LINK) facets = [] # convert index-based tags to facets for tag in util.get_list(obj, 'tags'): type = tag.get('objectType') url = tag.get('url') if not url and type != 'hashtag': continue facet = { '$type': 'app.bsky.richtext.facet', } try: start = int(tag['startIndex']) if start and obj.get('content_is_html'): raise NotImplementedError('HTML content is not supported with index tags') end = start + int(tag['length']) facet['index'] = { # convert indices from Unicode chars to UTF-8 encoded bytes # 'byteStart': len(content[:start].encode()), 'byteEnd': len(content[:end].encode()), } except (KeyError, ValueError, IndexError, TypeError): pass name = tag.get('displayName', '').strip().lstrip('@#') if type == 'hashtag': facet['features'] = [{ '$type': 'app.bsky.richtext.facet#tag', 'tag': name, }] elif type == 'mention': # extract and if necessary resolve DID if url.startswith('did:'): did = url else: if match := AT_URI_PATTERN.match(url): did ='repo') elif match := BSKY_APP_URL_RE.match(url): did ='id') if not did.startswith('did:') and client: did =['did'] if not did: continue facet['features'] = [{ '$type': 'app.bsky.richtext.facet#mention', 'did': did, }] else: facet['features'] = [{ '$type': 'app.bsky.richtext.facet#link', 'uri': url, }] if 'index' not in facet: # use displayName to guess index at first location found in text. note # that #/@ character for mentions is included in index end. # # can't use \b for word boundaries here because that only includes # alphanumerics, and Bluesky hashtags can include emoji prefix = '#' if type == 'hashtag' else '@' if type == 'mention' else '' match ='[\s^]({prefix}{name})[\s$]', text) if match: facet['index'] = { 'byteStart': len(content[:match.start(1)].encode()), 'byteEnd': len(content[:match.end(1)].encode()), } else: continue # skip or trim this facet if it's off the end of content that got truncated index = facet.get('index') text_len = len(text.encode()) if index.get('byteStart', 0) >= text_len: continue if index.get('byteEnd', 0) > text_len: index['byteEnd'] = text_len if facet not in facets: facets.append(facet) # images images_embed = images_record_embed = None images = as1.get_objects(obj, 'image') if images: images_embed = { '$type': 'app.bsky.embed.images#view', 'images': [{ '$type': 'app.bsky.embed.images#viewImage', 'thumb': img.get('url'), 'fullsize': img.get('url'), 'alt': img.get('displayName') or '', } for img in images[:4]], } images_record_embed = { '$type': 'app.bsky.embed.images', 'images': [{ '$type': 'app.bsky.embed.images#image', 'image': blobs.get(util.get_url(img)) or {}, 'alt': img.get('displayName') or '', } for img in images[:4]], } # article/note attachments record_embed = record_record_embed = external_embed = external_record_embed = None for att in util.get_list(obj, 'attachments'): if not att.get('objectType') in ('article', 'link', 'note'): continue id = att.get('id') or '' url = att.get('url') or '' if (id.startswith('at://') or id.startswith(Bluesky.BASE_URL) or url.startswith('at://') or url.startswith(Bluesky.BASE_URL)): # quoted Bluesky post embed = from_as1(att).get('post') or {} embed['value'] = embed.pop('record', None) record_embed = { '$type': f'app.bsky.embed.record#view', 'record': { **embed, '$type': f'app.bsky.embed.record#viewRecord', # override these so that trim_nulls below will remove them 'likeCount': None, 'replyCount': None, 'repostCount': None, }, } record_record_embed = { '$type': f'app.bsky.embed.record', 'record': from_as1_to_strong_ref(att, client=client), } else: # external link external_record_embed = { '$type': f'app.bsky.embed.external', 'external': { '$type': f'app.bsky.embed.external#external', 'uri': url or id, 'title': att.get('displayName'), 'description': att.get('summary') or att.get('content') or '', } } external_embed = { '$type': f'app.bsky.embed.external#view', 'external': { **external_record_embed['external'], '$type': f'app.bsky.embed.external#viewExternal', 'thumb': util.get_first(att, 'image'), }, } if record_embed and (images_embed or external_embed): embed = { '$type': 'app.bsky.embed.recordWithMedia#view', 'record': record_embed, 'media': images_embed or external_embed, } record_embed = { '$type': 'app.bsky.embed.recordWithMedia', 'record': record_record_embed, 'media' : images_record_embed or external_record_embed, } else: embed = record_embed or images_embed or external_embed record_embed = record_record_embed or images_record_embed or external_record_embed # in reply to reply = None in_reply_to = base_object(obj) if in_reply_to: parent_ref = from_as1_to_strong_ref(in_reply_to, client=client, value=True) root_ref = (parent_ref.pop('value', {}).get('reply', {}).get('root') or parent_ref) reply = { '$type': '', 'root': root_ref, 'parent': parent_ref, } ret = trim_nulls({ '$type': '', 'text': text, 'createdAt': from_as1_datetime(obj.get('published')), 'embed': record_embed, 'facets': facets, 'reply': reply, }, ignore=('alt', 'createdAt', 'cid', 'description', 'text', 'title', 'uri')) if not out_type or out_type == '': return ret # author author = as1.get_object(obj, 'author') author.setdefault('objectType', 'person') author = from_as1(author, out_type='') ret = trim_nulls({ '$type': 'app.bsky.feed.defs#postView', 'uri': obj.get('id') or obj.get('url') or '', 'cid': '', 'record': ret, 'author': author, 'embed': embed, 'replyCount': 0, 'repostCount': 0, 'likeCount': 0, 'indexedAt': from_as1_datetime(None), }, ignore=('author', 'createdAt', 'cid', 'description', 'indexedAt', 'record', 'text', 'title', 'uri')) if out_type == 'app.bsky.feed.defs#postView': return ret elif out_type == 'app.bsky.feed.defs#feedViewPost': return { '$type': out_type, 'post': ret, } assert False, "shouldn't happen" raise ValueError(f'AS1 object has unknown objectType {type} verb {verb}')
[docs] def to_as1(obj, type=None, uri=None, repo_did=None, repo_handle=None, pds=DEFAULT_PDS): """Converts a Bluesky object to an AS1 object. Args: obj (dict): ``app.bsky.*`` object type (str): optional ``$type`` to parse with, only used if ``obj['$type']`` is unset uri (str): optional ``at://`` URI of this object. Used to populate the ``id`` and ``url`` fields for some object types, eg posts. repo_did (str): optional DID of the repo this object is from. Required to generate image URLs. repo_handle (str): optional handle of the user whose repo this object is from pds (str): base URL of the PDS that currently serves this object's repo. Required to generate image URLs. Defaults to ````. Returns: dict: AS1 object, or None if the record doesn't correspond to an AS1 object, eg "not found" records Raises: ValueError: if the ``$type`` field is missing or unsupported """ if not obj: return {} type = obj.get('$type') or type if not type: raise ValueError('Bluesky object missing $type field') uri_authority = None if uri: if not uri.startswith('at://'): raise ValueError('Expected at:// uri, got {uri}') if parsed := AT_URI_PATTERN.match(uri): uri_authority = # for nested to_as1 calls, if necessary kwargs = {'repo_did': repo_did, 'repo_handle': repo_handle, 'pds': pds} # TODO: once we're on Python 3.10, switch this to a match statement! if type in ('', '', '', '', ): images = [{'url': obj.get('avatar')}] banner = obj.get('banner') if banner: images.append({'url': obj.get('banner'), 'objectType': 'featured'}) handle = obj.get('handle') did = obj.get('did') if type == '': if not handle: handle = repo_handle if not did: did = repo_did urls = [] if handle: urls.append(Bluesky.user_url(handle)) if not util.domain_or_parent_in(handle, [DEFAULT_PDS_DOMAIN]): urls.append(f'https://{handle}/') elif did and did.startswith('did:web:'): urls.extend([did_web_to_url(did), Bluesky.user_url(did)]) ret = { 'objectType': 'person', 'id': did, 'url': urls, 'displayName': obj.get('displayName'), 'username': obj.get('handle') or repo_handle, 'summary': obj.get('description'), 'image': images, } # avatar and banner are blobs in; convert to URLs if type == '': repo_did = repo_did or did if repo_did and pds: for img in ret['image']: img['url'] = blob_to_url(blob=img['url'], repo_did=repo_did, pds=pds) else: ret['image'] = [] # convert public view opt-out to unlisted AS1 audience targeting # # labels = (obj.get('labels', {}).get('values', []) if type == '' else obj.get('labels', [])) for label in labels: if label.get('val') == NO_AUTHENTICATED_LABEL and not label.get('neg'): ret['to'] = [{ 'objectType': 'group', 'alias': '@unlisted', }] elif type == '': text = obj.get('text', '') # convert facets to tags tags = [] for facet in obj.get('facets', []): tag = {} for feat in facet.get('features', []): if feat.get('$type') == 'app.bsky.richtext.facet#link': tag.update({ 'objectType': 'article', 'url': feat.get('uri'), }) elif feat.get('$type') == 'app.bsky.richtext.facet#mention': tag.update({ 'objectType': 'mention', 'url': Bluesky.user_url(feat.get('did')), }) elif feat.get('$type') == 'app.bsky.richtext.facet#tag': tag.update({ 'objectType': 'hashtag', 'displayName': feat.get('tag') }) index = facet.get('index', {}) # convert indices from UTF-8 encoded bytes to Unicode chars (code points) # byte_start = index.get('byteStart') byte_end = index.get('byteEnd') try: if byte_start is not None: tag['startIndex'] = len(text.encode()[:byte_start].decode()) if byte_end is not None: tag.setdefault('displayName', text.encode()[byte_start:byte_end].decode()) tag['length'] = len(tag['displayName']) except UnicodeDecodeError as e: logger.warning(f"Couldn't apply facet {facet} to unicode text: {text}") tags.append(tag) in_reply_to = obj.get('reply', {}).get('parent', {}).get('uri') ret = { 'objectType': 'comment' if in_reply_to else 'note', 'id': uri, 'url': at_uri_to_web_url(uri), 'content': text, 'inReplyTo': [{ 'id': in_reply_to, 'url': at_uri_to_web_url(in_reply_to), }], 'published': obj.get('createdAt', ''), 'tags': tags, 'author': repo_did, } # embeds embed = obj.get('embed') or {} embed_type = embed.get('$type') if embed_type == 'app.bsky.embed.images': ret['image'] = to_as1(embed, **kwargs) elif embed_type in ('app.bsky.embed.external', 'app.bsky.embed.record'): ret['attachments'] = [to_as1(embed, **kwargs)] elif embed_type == 'app.bsky.embed.recordWithMedia': # TODO # ret['attachments'] = [to_as1(embed.get('record', {}).get('record'), # type='com.atproto.repo.strongRef', **kwargs)] ret['attachments'] = [] media = embed.get('media') media_type = media.get('$type') if media_type == 'app.bsky.embed.external': ret['attachments'].append(to_as1(media, **kwargs)) elif media_type == 'app.bsky.embed.images': ret['image'] = to_as1(media, **kwargs) else: assert False, f'Unknown embed media type: {media_type}' elif type in ('app.bsky.feed.defs#postView', 'app.bsky.embed.record#viewRecord'): ret = to_as1(obj.get('record') or obj.get('value'), **kwargs) author = obj.get('author') or {} uri = obj.get('uri') ret.update({ 'id': uri, 'url': (at_uri_to_web_url(uri, handle=author.get('handle')) if uri.startswith('at://') else None), 'author': to_as1(author, type='', **kwargs), }) # convert embeds to attachments for embed in util.get_list(obj, 'embeds') + util.get_list(obj, 'embed'): embed_type = embed.get('$type') if embed_type == 'app.bsky.embed.images#view': ret.setdefault('image', []).extend(to_as1(embed, **kwargs)) elif embed_type in ('app.bsky.embed.external#view', 'app.bsky.embed.record#view'): ret['attachments'] = [to_as1(embed, **kwargs)] elif embed_type == 'app.bsky.embed.recordWithMedia#view': ret['attachments'] = [to_as1(embed.get('record', {}).get('record'), **kwargs)] media = embed.get('media') media_type = media.get('$type') if media_type == 'app.bsky.embed.external#view': ret.setdefault('attachments', []).append(to_as1(media, **kwargs)) elif media_type == 'app.bsky.embed.images#view': ret.setdefault('image', []).extend(to_as1(media, **kwargs)) else: assert False, f'Unknown embed media type: {media_type}' elif type == 'app.bsky.embed.images': if repo_did and pds: ret = [] for img in obj.get('images', []): image = img.get('image') if image: url = blob_to_url(blob=image, repo_did=repo_did, pds=pds) ret.append({'url': url, 'displayName': img['alt']} if img.get('alt') else url) else: ret = [] elif type == 'app.bsky.embed.images#view': ret = [{ 'url': img.get('fullsize'), 'displayName': img.get('alt'), } for img in obj.get('images', [])] elif type in ('app.bsky.embed.external', 'app.bsky.embed.external#view'): ret = to_as1(obj.get('external'), type='app.bsky.embed.external#viewExternal', **kwargs) elif type in ('app.bsky.embed.external#external', 'app.bsky.embed.external#viewExternal'): ret = { 'objectType': 'link', 'url': obj.get('uri'), 'displayName': obj.get('title'), 'summary': obj.get('description'), 'image': obj.get('thumb'), } elif type == 'app.bsky.embed.record': return None # TODO # return (to_as1(record, **kwargs, type='com.atproto.repo.strongRef') # if record else None) elif type == 'app.bsky.embed.record#view': record = obj.get('record') return to_as1(record, **kwargs) if record else None elif type in ('app.bsky.embed.record#viewNotFound', 'app.bsky.embed.record#viewBlocked', 'app.bsky.feed.defs#notFoundPost', ): return None elif type == 'app.bsky.feed.defs#feedViewPost': ret = to_as1(obj.get('post'), type='app.bsky.feed.defs#postView', **kwargs) reason = obj.get('reason') if reason and reason.get('$type') == 'app.bsky.feed.defs#reasonRepost': uri = obj.get('post', {}).get('viewer', {}).get('repost') ret = { 'objectType': 'activity', 'verb': 'share', 'id': uri, 'url': at_uri_to_web_url(uri), 'object': ret, 'actor': to_as1(reason.get('by'), type='', **kwargs), } elif type == '': subject = obj.get('subject', {}).get('uri') ret = { 'objectType': 'activity', 'verb': 'like', 'id': uri, 'object': subject, 'actor': repo_did, } if subject and uri_authority: if web_url := at_uri_to_web_url(subject): # synthetic fragment ret['url'] = f'{web_url}#liked_by_{uri_authority}' elif type == 'app.bsky.graph.follow': ret = { 'objectType': 'activity', 'verb': 'follow', 'id': uri, 'url': at_uri_to_web_url(uri), 'object': obj.get('subject'), 'actor': repo_did, } elif type == 'app.bsky.feed.repost': subject = obj.get('subject', {}).get('uri') ret = { 'objectType': 'activity', 'verb': 'share', 'id': uri, 'object': subject, 'actor': repo_did, 'published': obj.get('createdAt'), } if subject and uri_authority: if web_url := at_uri_to_web_url(subject): # synthetic fragment ret['url'] = f'{web_url}#reposted_by_{uri_authority}' elif type == 'app.bsky.feed.defs#threadViewPost': return to_as1(obj.get('post'), type='app.bsky.feed.defs#postView', **kwargs) elif type in ('app.bsky.feed.defs#generatorView', 'app.bsky.graph.defs#listView'): uri = obj.get('uri') ret = { 'objectType': 'service', 'id': uri, 'url': at_uri_to_web_url(uri), 'displayName': (obj.get('displayName') # generator or obj.get('name')), # list 'summary': obj.get('description'), 'image': obj.get('avatar'), 'author': to_as1(obj.get('creator'), type='', **kwargs), } elif type == 'app.bsky.feed.defs#blockedPost': uri = obj.get('uri') or uri return { 'objectType': 'note', 'id': uri, 'url': at_uri_to_web_url(uri), 'blocked': True, 'author': obj.get('blockedAuthor', {}).get('did'), } return to_as1(obj.get('post'), type='app.bsky.feed.defs#postView', **kwargs) else: raise ValueError(f'Bluesky object has unknown $type: {type}') ret = trim_nulls(ret) # ugly hack if isinstance(ret, dict) and ret.get('author') == {'objectType': 'person'}: del ret['author'] return ret
[docs] def blob_to_url(*, blob, repo_did, pds=DEFAULT_PDS): """Generates a URL for a blob. Supports both new and old style blobs: The resulting URL is a ``com.atproto.sync.getBlob`` XRPC call to the PDS. For blobs on the official ```` PDS, we could consider using their CDN instead: ``[DID]/[CID]@jpeg`` They also run a resizing image proxy on ```` with URLs like ``[SIG]/rs:fit:2000:2000:1:0/plain/[CID]@jpeg``, not sure how to generate signatures for it yet. Args: blob (dict) repo_did (str): DID of the repo that owns this blob pds (str): base URL of the PDS that serves this repo. Defaults to :const:`DEFAULT_PDS` Returns: str: URL for this blob, or None if ``blob`` is empty or has no CID """ if not blob: return None assert repo_did and pds cid = blob.get('ref', {}).get('$link') or blob.get('cid') if cid: path = f'/xrpc/com.atproto.sync.getBlob?did={repo_did}&cid={cid}' return urllib.parse.urljoin(pds, path)
[docs] class Bluesky(Source): """Bluesky source class. See file docstring and :class:`Source` class for details. Attributes: handle (str) did (str) client (lexrpc.Client) """ DOMAIN = '' BASE_URL = '' NAME = 'Bluesky' TRUNCATE_TEXT_LENGTH = 300 # TODO: load from lexicon POST_ID_RE = AT_URI_PATTERN TYPE_LABELS = { 'post': 'post', 'comment': 'reply', 'repost': 'repost', 'like': 'like', } _client = None _app_password = None
[docs] def __init__(self, handle, did=None, access_token=None, refresh_token=None, app_password=None, session_callback=None): """Constructor. Args: handle (str): username, eg ```` or ```` did (str): did:plc or did:web, optional access_token (str): optional refresh_token (str): optional app_password (str): optional session_callback (callable, dict => None): passed to :class:`lexrpc.Client` constructor, called when a new session is created or refreshed """ self.handle = handle self.did = did self._app_password = app_password headers = {'User-Agent': util.user_agent} self._client = Client(access_token=access_token, refresh_token=refresh_token, headers=headers, session_callback=session_callback)
@property def client(self): if not self._client.session and self._app_password: # log in resp ={ 'identifier': self.did or self.handle, 'password': self._app_password, }) self.handle = resp['handle'] self.did = resp['did'] return self._client
[docs] @classmethod def user_url(cls, handle): """Returns the profile URL for a given handle. Args: handle (str) Returns: str: profile URL """ return f'{cls.BASE_URL}/profile/{handle.lstrip("@")}'
[docs] @classmethod def user_to_actor(cls, user, **kwargs): """Converts a user to an actor. Args: user (dict): an ```` record kwargs: passed through to :func:`to_as1` Returns: dict: ActivityStreams actor """ return to_as1(user, **kwargs)
[docs] @classmethod def post_url(cls, handle, tid): """Returns the post URL for a given handle and tid. Args: handle (str) tid (str) Returns: str: profile URL """ return f'{cls.user_url(handle)}/post/{tid}'
[docs] @classmethod def post_id(cls, url): """Returns the `at://` URI for the given URL if it's for a post. Also see ``arroba.util.parse_at_uri``. Returns: str or None """ if not url: return None if not AT_URI_PATTERN.match(url): try: url = web_url_to_at_uri(url) except ValueError: return None if (len(url.removeprefix('at://').split('/')) == 3 # only return at:// URIs for posts, not profiles and not url.endswith('/')): return url
[docs] def get_activities_response(self, user_id=None, group_id=None, app_id=None, activity_id=None, fetch_replies=False, fetch_likes=False, fetch_shares=False, include_shares=True, fetch_mentions=False, search_query=None, start_index=None, count=None, cache=None, **kwargs): """Fetches posts and converts them to AS1 activities. See :meth:`Source.get_activities_response` for more information. Bluesky-specific details: Args: activity_id (str): an ``at://`` URI """ assert not start_index params = {} if count is not None: params['limit'] = count posts = None handle = self.handle if activity_id: if not activity_id.startswith('at://'): raise ValueError(f'Expected activity_id to be at:// URI; got {activity_id}') resp ={}, uri=activity_id, depth=1) posts = [resp.get('thread', {})] elif group_id in (None, FRIENDS): resp ={}, **params) posts = resp.get('feed', []) else: # eg group_id SELF actor = user_id or self.did or self.handle if not actor: raise ValueError('user_id is required') resp ={}, actor=actor, **params) posts = resp.get('feed', []) if cache is None: # for convenience, throwaway object just for this method cache = {} activities = [] for post in posts: reason = post.get('reason') is_repost = reason and reason.get('$type') == 'app.bsky.feed.defs#reasonRepost' if is_repost and not include_shares: continue activity = self.postprocess_activity(self._post_to_activity(post)) activities.append(activity) obj = activity['object'] id = obj.get('id') tags = obj.setdefault('tags', []) if is_repost: # If it's a repost we're not interested in responses to it. continue bs_post = post.get('post') if bs_post and id: # Likes like_count = bs_post.get('likeCount') if fetch_likes and like_count and like_count != cache.get('ABL ' + id): likers ={}, uri=bs_post.get('uri')) tags.extend(self._make_like(bs_post, l.get('actor')) for l in likers.get('likes')) cache['ABL ' + id] = like_count # Reposts repost_count = bs_post.get('repostCount') if fetch_shares and repost_count and repost_count != cache.get('ABRP ' + id): reposters ={}, uri=bs_post.get('uri')) tags.extend(self._make_share(bs_post, r) for r in reposters.get('repostedBy')) cache['ABRP ' + id] = repost_count # Replies reply_count = bs_post.get('replyCount') if fetch_replies and reply_count and reply_count != cache.get('ABR ' + id): replies = self._get_replies(bs_post.get('uri')) replies = trim_nulls([to_as1(reply, 'app.bsky.feed.defs#threadViewPost') for reply in replies]) for r in replies: r['id'] = self.tag_uri(r['id']) obj['replies'] = { 'items': replies, } cache['ABR ' + id] = reply_count resp = self.make_activities_base_response(util.trim_nulls(activities)) return resp
[docs] def get_actor(self, user_id=None): """Fetches and returns a user. Args: user_id (str): either handle or DID; defaults to current user Returns: dict: ActivityStreams actor """ if user_id is None: user_id = self.did or self.handle profile ={}, actor=user_id) return to_as1(profile, type='')
[docs] def get_comment(self, comment_id, **kwargs): """Fetches and returns a comment. Args: comment_id: string status id **kwargs: unused Returns: dict, ActivityStreams object Raises: :class:`ValueError`: if comment_id is invalid """ post_thread ={}, uri=comment_id, depth=1) obj = to_as1(post_thread.get('thread'), 'app.bsky.feed.defs#threadViewPost') return obj
def _post_to_activity(self, post): """Converts a post to an activity. Args: post: Bluesky post app.bluesky.feed.defs#feedViewPost Returns: AS1 activity """ obj = to_as1(post, type='app.bsky.feed.defs#feedViewPost') if obj.get('objectType') == 'activity': return obj return { 'id': obj.get('id'), 'verb': 'post', 'actor': obj.get('author'), 'object': obj, 'objectType': 'activity', } def _make_like(self, post, actor): return self._make_like_or_share(post, actor, 'like') def _make_share(self, post, actor): return self._make_like_or_share(post, actor, 'share') def _make_like_or_share(self, post, actor, verb): """Generates and returns a ActivityStreams like object. Args: post: dict, Bluesky app.bsky.feed.defs#feedViewPost actor: dict, Bluesky verb: string, 'like' or 'share' Returns: dict, AS1 like activity """ assert verb in ('like', 'share') label = 'liked' if verb == 'like' else 'reposted' url = at_uri_to_web_url(post.get('uri'), post.get('author').get('handle')) actor_id = actor.get('did') author = to_as1(actor, '') author['id'] = self.tag_uri(author['id']) suffix = f'{label}_by_{actor_id}' return { 'id': self.tag_uri(f'{post.get("uri")}_{suffix}'), 'url': f'{url}#{suffix}', 'objectType': 'activity', 'verb': verb, 'object': {'url': url}, 'author': author, } def _get_replies(self, uri): """ Gets the replies to a specific post and returns them in ascending order of creation. Does not include the original post. Args: uri: string, post uri Returns: list, Bluesky app.bsky.feed.defs#threadViewPost """ ret = [] resp ={}, uri=uri) thread = resp.get('thread') if thread: ret = self._recurse_replies(thread) return sorted(ret, key = lambda thread: thread.get('post', {}).get('record', {}).get('createdAt') or '') # TODO this ought to take a depth limit. def _recurse_replies(self, thread): """ Recurses through a Bluesky app.bsky.feed.defs#threadViewPost and returns its replies as a list. Args: thread: dict, Bluesky app.bsky.feed.defs#threadViewPost Returns: list, Bluesky app.bsky.feed.defs#threadViewPost """ ret = [] for r in thread.get('replies', []): ret += [r] ret += self._recurse_replies(r) return ret
[docs] def create(self, obj, include_link=OMIT_LINK, ignore_formatting=False): """Creates a post, reply, repost, or like. Args: obj: ActivityStreams object include_link (str) ignore_formatting (bool) Returns: CreationResult: whose content will be a dict with ``id``, ``url``, and ``type`` keys (all optional) for the newly created object (or None) """ return self._create(obj, preview=False, include_link=include_link, ignore_formatting=ignore_formatting)
[docs] def preview_create(self, obj, include_link=OMIT_LINK, ignore_formatting=False): """Preview creating a post, reply, repost, or like. Args: obj: ActivityStreams object include_link (str) ignore_formatting (bool) Returns: CreationResult: content will be a str HTML snippet or None """ return self._create(obj, preview=True, include_link=include_link, ignore_formatting=ignore_formatting)
def _create(self, obj, preview=None, include_link=OMIT_LINK, ignore_formatting=False): assert preview in (False, True) assert self.did type = obj.get('objectType') verb = obj.get('verb') base_obj = self.base_object(obj) base_id = base_obj.get('id') base_url = base_obj.get('url') is_reply = type == 'comment' or obj.get('inReplyTo') atts = obj.get('attachments', []) images = util.dedupe_urls(util.get_list(obj, 'image') + [a for a in atts if a.get('objectType') == 'image']) has_media = images and (type in ('note', 'article') or is_reply) # prefer displayName over content for articles # # TODO: handle activities as well as objects? ie pull out ['object'] here if # necessary? prefer_content = type == 'note' or (base_url and is_reply) preview_description = '' content = self._content_for_create( obj, ignore_formatting=ignore_formatting, prefer_name=not prefer_content) if not content: if type == 'activity': content = verb elif has_media: content = '' else: return creation_result( abort=False, # keep looking for things to publish, error_plain='No content text found.', error_html='No content text found.') # truncate and ellipsize content if necessary url = obj.get('url') content = self.truncate(content, url, include_link, type) # TODO linkify mentions and hashtags preview_content = util.linkify(content, pretty=True, skip_bare_cc_tlds=True) post_label = f"{self.NAME} {self.TYPE_LABELS['post']}" if type == 'activity' and verb == 'like': if not base_url: return creation_result( abort=True, error_plain=f"Could not find a {post_label} to {self.TYPE_LABELS['like']}.", error_html=f"Could not find a {post_label} to <a href=\"\">{self.TYPE_LABELS['like']}</a>. Check that your post has the right <a href=\"\">u-like-of link</a>.") if preview: preview_description += f"<span class=\"verb\">{self.TYPE_LABELS['like']}</span> <a href=\"{base_url}\">this {self.TYPE_LABELS['post']}</a>." return creation_result(description=preview_description) else: like_atp = from_as1(obj, client=self) result ={ 'repo': self.did, 'collection': like_atp['$type'], 'record': like_atp, }) return creation_result({ 'id': result['uri'], 'url': at_uri_to_web_url(like_atp['subject']['uri']) + '/liked-by' }) elif type == 'activity' and verb == 'share': if not base_url: return creation_result( abort=True, error_plain=f"Could not find a {post_label} to {self.TYPE_LABELS['repost']}.", error_html=f"Could not find a {post_label} to <a href=\"\">{self.TYPE_LABELS['repost']}</a>. Check that your post has the right <a href=\"\">repost-of</a> link.") if preview: preview_description += f"<span class=\"verb\">{self.TYPE_LABELS['repost']}</span> <a href=\"{base_url}\">this {self.TYPE_LABELS['post']}</a>." return creation_result(description=preview_description) else: repost_atp = from_as1(obj, client=self) result ={ 'repo': self.did, 'collection': repost_atp['$type'], 'record': repost_atp, }) return creation_result({ 'id': result['uri'], 'url': at_uri_to_web_url(repost_atp['subject']['uri']) + '/reposted-by' }) elif (type in ('note', 'article') or is_reply or (type == 'activity' and verb == 'post')): # probably a bookmark # TODO: add bookmarked URL and facet # tricky because we only want to do that if it's not truncated away data = {'status': content} if is_reply and base_url: preview_description += f"<span class=\"verb\">{self.TYPE_LABELS['comment']}</span> to <a href=\"{base_url}\">this {self.TYPE_LABELS['post']}</a>:" data['in_reply_to_id'] = base_id else: preview_description += f"<span class=\"verb\">{self.TYPE_LABELS['post']}</span>:" videos = util.dedupe_urls( [obj] + [a for a in atts if a.get('objectType') == 'video'], key='stream') if videos: logger.warning(f'Found {len(videos)} videos, but Bluesky doesn\'t support video yet.') if len(images) > MAX_IMAGES: images = images[:MAX_IMAGES] logger.warning(f'Found {len(images)} images! Only using the first {MAX_IMAGES}: {images!r}') if preview: media_previews = [ f"<img src=\"{util.get_url(img)}\" alt=\"{img.get('displayName') or ''}\" />" for img in images ] if media_previews: preview_content += '<br /><br />' + ' &nbsp; '.join(media_previews) return creation_result(content=preview_content, description=preview_description) else: blobs = self.upload_media(images) post_atp = from_as1(obj, blobs=blobs, client=self) post_atp['text'] = content # facet for link to original post, if any if url: url_index = content.rfind(url) if url_index != -1: byte_start = len(content[:url_index].encode()) post_atp.setdefault('facets', []).append({ '$type': 'app.bsky.richtext.facet', 'features': [{ '$type': 'app.bsky.richtext.facet#link', 'uri': url, }], 'index': { 'byteStart': byte_start, 'byteEnd': byte_start + len(url.encode()), }, }) result ={ 'repo': self.did, 'collection': post_atp['$type'], 'record': post_atp, }) return creation_result({ 'id': result['uri'], 'url': at_uri_to_web_url(result['uri'], handle=self.handle), }) return creation_result( abort=False, error_plain=f'Cannot publish type={type}, verb={verb} to Bluesky', error_html=f'Cannot publish type={type}, verb={verb} to Bluesky')
[docs] def delete(self, at_uri): """Deletes a record. Args: at_uri (str): at:// URI of record delete Returns: CreationResult: content is dict with ``url`` and ``id`` fields """ match = AT_URI_PATTERN.match(at_uri) if not match: raise ValueError(f'Expected at:// URI, got {at_uri}') authority, collection, rkey = match.groups(){ 'repo': authority, 'collection': collection, 'rkey': rkey, }) return creation_result({ 'id': at_uri, 'url': at_uri_to_web_url(at_uri), })
[docs] def preview_delete(self, at_uri): """Previews deleting a record. Args: at_uri (str): at:// URI of record delete Returns: CreationResult: """ url = at_uri_to_web_url(at_uri) return creation_result(description=f'<span class="verb">delete</span> <a href="{url}">this</a>.')
[docs] def base_object(self, obj): return base_object(obj)
def upload_media(self, media): blobs = {} for obj in media: url = util.get_url(obj, key='stream') or util.get_url(obj) if not url or url in blobs: continue with util.requests_get(url, stream=True) as fetch: fetch.raise_for_status() upload = input=fetch.raw, headers={'Content-Type': fetch.headers['Content-Type']} ) blobs[url] = upload['blob'] return blobs