"""Bluesky source class.
* https://bsky.app/
* https://atproto.com/lexicons/app-bsky-actor
* https://github.com/bluesky-social/atproto/tree/main/lexicons/app/bsky
"""
import copy
from datetime import datetime, timezone
import html
import json
import logging
from pathlib import Path
import re
import string
import urllib.parse
from urllib.parse import urlparse, urlunparse
from bs4 import BeautifulSoup
from io import BytesIO
from lexrpc import Client
from lexrpc.base import AT_URI_RE, Base, LANG_RE
from multiformats import CID
from oauth_dropins import bluesky as oauth_bluesky
from oauth_dropins.webutil import util
from oauth_dropins.webutil.util import trim_nulls
from pymediainfo import MediaInfo
import requests
from requests_oauth2client import OAuth2AccessTokenAuth, TokenSerializer
from . import as1
from .as2 import QUOTE_RE_SUFFIX
from .source import (
creation_result,
FRIENDS,
HTML_ENTITY_RE,
html_to_text,
INCLUDE_LINK,
INCLUDE_IF_TRUNCATED,
OMIT_LINK,
Source,
)
logger = logging.getLogger(__name__)
# via https://atproto.com/specs/handle
HANDLE_RE = re.compile(
r'([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+'
r'[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?'
)
DID_WEB_PATTERN = re.compile('did:web:' + HANDLE_RE.pattern)
MAX_MEDIA_SIZE_BYTES = 5_000_000
MAX_FOLLOWS = 10000
# Maps AT Protocol NSID collections to path elements in bsky.app URLs.
# Used in at_uri_to_web_url.
#
# eg for mapping a URI like:
# at://did:plc:z72i7hd/app.bsky.feed.generator/mutuals
# to a frontend URL like:
# https://bsky.app/profile/did:plc:z72i7hdynmk6r22z27h6tvur/feed/mutuals
COLLECTION_TO_BSKY_APP_TYPE = {
'app.bsky.feed.generator': 'feed',
'app.bsky.feed.post': 'post',
'app.bsky.graph.list': 'lists',
}
BSKY_APP_TYPE_TO_COLLECTION = {
name: coll for coll, name in COLLECTION_TO_BSKY_APP_TYPE.items()
}
# maps AS1 objectType/verb to possible output Bluesky lexicon types.
# used in from_as1
POST_TYPES = as1.POST_TYPES | set(['bookmark'])
FROM_AS1_TYPES = {
as1.ACTOR_TYPES: (
'app.bsky.actor.profile',
'app.bsky.actor.defs#profileView',
'app.bsky.actor.defs#profileViewBasic',
'app.bsky.actor.defs#profileViewDetailed',
'site.standard.publication',
),
tuple(set(POST_TYPES) - set(['article'])): (
'app.bsky.feed.post',
'app.bsky.feed.defs#feedViewPost',
'app.bsky.feed.defs#postView',
),
('article',): (
'app.bsky.feed.post',
'app.bsky.feed.defs#feedViewPost',
'app.bsky.feed.defs#postView',
'site.standard.document',
),
('block',): (
'app.bsky.graph.block',
'app.bsky.graph.listblock',
),
('flag',): (
'com.atproto.moderation.createReport#input',
),
('follow',): (
'app.bsky.graph.follow',
),
('like',): (
'app.bsky.feed.like',
),
('share',): (
'app.bsky.feed.repost',
'app.bsky.feed.defs#feedViewPost',
'app.bsky.feed.defs#reasonRepost',
),
}
AUTHORITY_CHARS = 'a-zA-Z0-9-.:'
RKEY_CHARS = f'{AUTHORITY_CHARS}~_'
BSKY_APP_URL_RE = re.compile(fr"""
https://(staging\.)?bsky\.app
/profile/(?P<id>[{AUTHORITY_CHARS}]+)
(/(?P<type>{"|".join(COLLECTION_TO_BSKY_APP_TYPE.values())})
/(?P<tid>[{RKEY_CHARS}]+))?
""", re.VERBOSE)
DEFAULT_PDS_DOMAIN = 'bsky.social'
DEFAULT_PDS = f'https://{DEFAULT_PDS_DOMAIN}/'
DEFAULT_APPVIEW = 'https://api.bsky.app'
# label on profiles set to only show them to logged in users
# https://bsky.app/profile/safety.bsky.app/post/3khhw7s3rtx2s
# https://docs.bsky.app/docs/advanced-guides/resolving-identities#for-backend-services
# https://github.com/bluesky-social/atproto/blob/main/packages/api/docs/labels.md#label-behaviors
NO_UNAUTHENTICATED_LABEL = '!no-unauthenticated'
# label on profiles that indicate automated/bot accounts
# https://docs.bsky.app/docs/advanced-guides/moderation#global-label-values
BOT_LABEL = 'bot'
# Bluesky => AS1 labels
#
# https://docs.bsky.app/docs/advanced-guides/moderation#global-label-values
# https://public.api.bsky.app/xrpc/app.bsky.labeler.getServices?dids=did%3Aplc%3Aar7c4by46qjdydhdevvrndac&detailed=true
# https://github.com/bluesky-social/atproto/blob/f2f8de63b333448d87c364578e023ddbb63b8b25/lexicons/com/atproto/label/defs.json#L139-L154
# https://github.com/bluesky-social/social-app/blob/3627a249ffb32e4c0f84597f8f9adf228ee90a8f/src/lib/moderation/useGlobalLabelStrings.ts
# https://github.com/bluesky-social/social-app/blob/3627a249ffb32e4c0f84597f8f9adf228ee90a8f/bskyembed/src/labels.ts
SENSITIVE_LABELS = {
'porn': 'Adult content',
'sexual': 'Sexually suggestive',
'nudity': 'Non-sexual nudity',
'graphic-media': 'Explicit or potentially disturbing media',
# deprecated
'nsfl': 'Explicit or potentially disturbing media',
'gore': 'Explicit or potentially disturbing media',
}
# AS1 => Bluesky keyword map
#
# Used for checking the as1 summary for keywords that might suggest a specific bluesky label level.
# We are checking labels in increasing order of severity, so the most severe one will be used even if lesser ones are present. (Python preserves the order of dict items)
AS1_TO_BLUESKY_LABEL_MAP = {
'nudity': ['nudity'],
'sexual': ['suggestive', 'sexual'],
'porn': ['porn', 'nsfw']
}
SENSITIVE_LABEL_DEFAULT = 'graphic-media'
# TODO: bring back validate? or remove?
LEXRPC = Base(truncate=True, validate=False)
ELLIPSIS = ' […]'
[docs]
def url_to_did_web(url):
"""Converts a URL to a ``did:web``.
In AT Proto, only hostname-based web DIDs are supported.
Paths are not supported, and will be discarded.
https://atproto.com/specs/did
Examples:
* ``https://foo.com`` => ``did:web:foo.com``
* ``https://foo.com:3000`` => ``did:web:foo.com``
* ``https://foo.bar.com/baz/baj`` => ``did:web:foo.bar.com``
Args:
url (str)
Returns:
str:
"""
parsed = urlparse(url)
if not parsed.hostname:
raise ValueError(f'Invalid URL: {url}')
if parsed.netloc != parsed.hostname:
logger.warning(f"URL {url} contained a port, which will not be included in the DID.")
if parsed.path and parsed.path != "/":
logger.warning(f"URL {url} contained a path, which will not be included in the DID.")
return f'did:web:{parsed.hostname}'
[docs]
def did_web_to_url(did):
"""Converts a did:web to a URL.
In AT Proto, only hostname-based web DIDs are supported.
Paths are not supported, and will throw an invalid error.
Examples:
* ``did:web:foo.com`` => ``https://foo.com``
* ``did:web:foo.com%3A3000`` => INVALID
* ``did:web:bar.com:baz:baj`` => INVALID
https://atproto.com/specs/did
Args:
did (str)
Returns:
str:
"""
if not did or not DID_WEB_PATTERN.fullmatch(did):
raise ValueError(f'Invalid did:web: {did}')
host = did.removeprefix('did:web:')
host = urllib.parse.unquote(host)
return f'https://{host}/'
[docs]
def at_uri_to_web_url(uri, handle=None):
"""Converts an ``at://`` URI to a ``https://bsky.app`` URL.
https://atproto.com/specs/at-uri-scheme
Args:
uri (str): ``at://`` URI
handle: (str): optional user handle. If not provided, defaults to the DID in
uri.
Returns:
str: ``https://bsky.app`` URL, or None
Raises:
ValueError: if uri is not a string or doesn't start with ``at://``
"""
if not uri:
return None
if not uri.startswith('at://'):
raise ValueError(f'Expected at:// URI, got {uri}')
parsed = urlparse(uri)
did = parsed.netloc
if not parsed.path:
return f'{Bluesky.user_url(handle or did)}'
collection, tid = parsed.path.strip('/').split('/')
type = COLLECTION_TO_BSKY_APP_TYPE.get(collection)
if not type:
return None
return f'{Bluesky.user_url(handle or did)}/{type}/{tid}'
[docs]
def web_url_to_at_uri(url, handle=None, did=None):
"""Converts a ``https://bsky.app`` URL to an ``at://`` URI.
https://atproto.com/specs/at-uri-scheme
Currently supports profile, post, and feed URLs with DIDs and handles, eg:
* ``https://bsky.app/profile/did:plc:123abc``
* ``https://bsky.app/profile/vito.fyi/post/3jt7sst7vok2u``
* ``https://bsky.app/profile/bsky.app/feed/mutuals``
If both ``handle`` and ``did`` are provided, and ``handle`` matches the URL,
the handle in the resulting URI will be replaced with ``did``.
Args:
url (str): ``bsky.app`` URL
handle (str): Bluesky handle, or None
did (str): Valid DID, or None
Returns:
str: ``at://`` URI, or None
Raises:
ValueError: if ``url`` can't be parsed as a ``bsky.app`` profile or post URL
"""
if not url:
return None
match = BSKY_APP_URL_RE.fullmatch(url)
if not match:
raise ValueError(f"{url} doesn't look like a bsky.app profile or post URL")
id = match.group('id')
assert id
# If a did and handle have been provided explicitly,
# replace the existing handle with the did.
if did and handle and id == handle:
id = did
rkey = match.group('tid')
type = match.group('type')
if type:
collection = BSKY_APP_TYPE_TO_COLLECTION[type]
assert rkey
else:
collection = 'app.bsky.actor.profile'
rkey = 'self'
return f'at://{id}/{collection}/{rkey}'
[docs]
def from_as1_to_strong_ref(obj, client=None, value=False, raise_=False):
"""Converts an AS1 object to an ATProto ``com.atproto.repo.strongRef``.
Uses AS1 ``id`` or ``url`, which should be an ``at://`` URI.
Args:
obj (dict): AS1 object or activity
client (lexrpc.Client): optional; if provided, this will be used to make API
calls to PDSes to fetch and populate the ``cid`` field and resolve handle
to DID.
value (bool): whether to include the record's ``value`` field in the
returned object
raise_ (bool): whether to raise ``ValueError`` if ``client`` is provided and
we can't fetch the object's record and populate ``cid``
Returns:
dict: ATProto ``com.atproto.repo.strongRef`` record
Raises:
ValueError:
"""
if not client:
assert not raise_
id = (obj.get('id') or as1.get_url(obj)) if isinstance(obj, dict) else obj
if match := AT_URI_RE.fullmatch(id):
at_uri = id
else:
at_uri = Bluesky.post_id(id) or ''
match = AT_URI_RE.fullmatch(at_uri)
if not match or not client:
if not match and raise_:
raise ValueError(f"Couldn't parse {id} as ATProto URI or Bluesky URL")
return {
'uri': at_uri,
'cid': '',
}
repo = match['repo']
if not repo.startswith('did:'):
handle = repo
repo = client.com.atproto.identity.resolveHandle(handle=handle)['did']
# only replace first instance of handle in case it's also in collection or rkey
at_uri = at_uri.replace(handle, repo, 1)
try:
record = client.com.atproto.repo.getRecord(
repo=repo, collection=match['collection'], rkey=match['rkey'])
except requests.RequestException as e:
logger.info(e)
record = {}
if not record and raise_:
raise ValueError(f"Couldn't load {at_uri}")
if not value:
record.pop('value', None)
return record
[docs]
def from_as1_datetime(val):
"""Converts an AS1 RFC 3339 datetime string to ATProto ISO 8601.
Bluesky requires full date and time with time zone, recommends UTC with
Z suffix, fractional seconds.
https://atproto.com/specs/lexicon#datetime
Returns now (ie the current time) if the input datetime can't be parsed.
Args:
val (str): RFC 3339 datetime
Returns:
str: ATProto compatible ISO 8601 datetime
"""
dt = util.now()
if val:
try:
dt = util.parse_iso8601(val.strip())
except (AttributeError, TypeError, ValueError):
logging.debug(f"Couldn't parse {val} as ISO 8601; defaulting to current time")
if dt.tzinfo:
dt = util.as_utc(dt)
# else it's naive, assume it's UTC
assert dt.utcoffset() is None, dt.utcoffset()
return dt.isoformat(sep='T', timespec='milliseconds') + 'Z'
[docs]
def base_object(obj):
"""Returns the "base" Bluesky object that an object operates on.
If the object is a reply, repost, or like of a Bluesky post, this returns
that post object. The id in the returned object is the AT protocol URI,
while the URL is the bsky.app web URL.
If the object is a follow or block, this returns the followed or blocked id, eg a
DID.
Args:
obj (dict): ActivityStreams object
Returns:
dict: minimal ActivityStreams object. Usually has at least ``id``; may
also have ``url``, ``author``, etc.
"""
for field in ('inReplyTo', 'object', 'target'):
if bases := util.get_list(obj, field):
for base in bases:
url = util.get_url(base)
if not url:
return {}
elif url.startswith('https://bsky.app/'):
return {
'id': web_url_to_at_uri(url),
'url': url,
}
elif url.startswith('at://'):
return {
'id': url,
'url': at_uri_to_web_url(url),
}
elif url.startswith('did:'):
return {
'id': url,
'url': Bluesky.user_url(url),
}
else: # closes for loop
raise ValueError(f"{field} {url} doesn't look like Bluesky/ATProto")
return {}
[docs]
def from_as1(obj, out_type=None, blobs=None, aspects=None, client=None,
original_fields_prefix=None, as_embed=False, raise_=False,
dynamic_sensitive_labels=False, multiple=False, domain=None):
"""Converts an AS1 object to a Bluesky object.
Converts to ``record`` types by default, eg ``app.bsky.actor.profile`` or
``app.bsky.feed.post``. Use ``out_type`` to convert to a different type, eg
``app.bsky.actor.defs#profileViewBasic`` or ``app.bsky.feed.defs#feedViewPost``.
The ``objectType`` field is required.
If a string value in an output Bluesky object is longer than its
``maxGraphemes`` or ``maxLength`` in its lexicon, it's truncated with an ``…``
ellipsis character at the end in order to fit.
Args:
obj (dict): AS1 object or activity
out_type (str): desired output lexicon ``$type``
blobs (dict): optional mapping from str URL to ``$type: blob`` dict (with
``ref``, ``mimeType``, and ``size``) to use in the returned object. If not
provided, or if this doesn't have an ``image`` or similar URL in the input
object, its output blob will be omitted.
aspects (dict): optional mapping from str URL to int (width,height) tuple.
Used to provide aspect ratio in image/video embeds.
client (Bluesky or lexrpc.Client): optional; if provided, this will be used
to make API calls to PDSes to fetch and populate CIDs for records
referenced by replies, likes, reposts, etc.
original_fields_prefix (str): optional; if provided, stores original object URLs,
post text, and actor profiles (ie before truncation) in custom fields with this
prefix in output records. For example, if this is `foo`, an actor's complete
``summary`` field will be stored in the custom ``fooOriginalDescription``
field, and their (first) `url` will be stored in ``fooOriginalUrl``.
as_embed (bool): whether to render the post as an external embed (ie link
preview) instead of a native post. This happens automatically if
``objectType`` is ``article``
raise_ (bool): whether to raise ``ValueError`` if ``client`` is provided and
we can't fetch an object's record
dynamic_sensitive_labels (bool): if enabled we attempt to determine the bluesky
label based on ``summary``
multiple (bool): if True, always return a list of output records. When
converting an input actor to an output ``app.bsky.actor.profile`` record,
also includes a ``community.lexicon.payments.webMonetization`` record if
the actor has a ``monetization`` property. Default False.
domain (str): optional. A DNS domain for the actor. Only used with
``out_type='site.standard.publication'``, and required then.
Returns:
dict or list: ``app.bsky.*`` object, or list of objects if ``multiple`` is True
Raises:
ValueError: if the object can't be converted, eg if the ``objectType`` or
``verb`` fields are missing or unsupported
"""
if isinstance(client, Bluesky):
client = client._client
activity = obj
inner_obj = as1.get_object(activity)
verb = activity.get('verb') or 'post'
if inner_obj and verb == 'post':
obj = inner_obj
type = as1.object_type(obj)
if not type:
raise ValueError(f"Missing objectType or verb")
obj = copy.deepcopy(obj)
actor = as1.get_object(activity, 'actor')
if blobs is None:
blobs = {}
if aspects is None:
aspects = {}
# validate out_type
if out_type:
for in_types, out_types in FROM_AS1_TYPES.items():
if type in in_types and out_type in out_types:
break
else:
raise ValueError(f"{type} {verb} doesn't support out_type {out_type}")
# summary can be HTML; convert to plain text
summary = orig_summary = obj.get('summary') or ''
summary_is_html = (bool(BeautifulSoup(summary, 'html.parser').find())
or HTML_ENTITY_RE.search(summary))
if summary_is_html:
summary = html_to_text(summary, ignore_links=True)
ret = None
rets = []
# for nested from_as1 calls, if necessary
kwargs = {'original_fields_prefix': original_fields_prefix}
# TODO: once we're on Python 3.10, switch this to a match statement!
if type in as1.ACTOR_TYPES:
# avatar and banner. banner is featured image, if available
avatar = util.get_url(obj, 'image')
banner = None
for img in as1.get_objects(obj, 'image'):
url = util.get_url(img)
if img.get('objectType') == 'featured' and url:
banner = url
break
# extract pinned post from featured collection.
# note that we skip pinned post ids that aren't at:// URIs!
pinned_post = None
if featured := as1.get_object(obj, 'featured'):
if first := (as1.get_id(featured, 'orderedItems')
or as1.get_id(featured, 'items')):
if first.startswith('at://'):
pinned_post = from_as1_to_strong_ref(first, client=client, raise_=raise_)
url = as1.get_url(obj)
id = obj.get('id')
if not url and id:
parsed = util.parse_tag_uri(id)
if parsed:
# This is only really formatted as a URL to keep url_to_did_web below happy
url = f'https://{parsed[0]}'
ret = {
'displayName': obj.get('displayName'),
'description': summary,
'avatar': blobs.get(avatar),
'banner': blobs.get(banner),
'pinnedPost': pinned_post,
'website': url,
}
if original_fields_prefix:
ret.update({
f'{original_fields_prefix}OriginalDescription': orig_summary,
f'{original_fields_prefix}OriginalUrl': url or id,
})
if not out_type or out_type == 'app.bsky.actor.profile':
ret = trim_nulls({**ret, '$type': 'app.bsky.actor.profile'})
# Web Monetization
# https://github.com/lexicon-community/lexicon/tree/main/community/lexicon/payments
if wallet := obj.get('monetization'):
if wallet.startswith('$'):
wallet = wallet.replace('$', 'https://', 1)
web_monetization = {
'$type': 'community.lexicon.payments.webMonetization',
'address': wallet,
}
rets = [ret, web_monetization]
elif out_type == 'site.standard.publication':
if not domain:
raise ValueError("No domain, can't convert to site.standard.publication")
ret = trim_nulls({
'$type': 'site.standard.publication',
'url': f'https://{domain}',
'name': obj.get('displayName') or '',
'description': summary,
'icon': blobs.get(avatar),
}, ignore=('name',))
else:
did_web = ''
if id and id.startswith('did:web:'):
did_web = id
else:
try:
did_web = url_to_did_web(url)
except ValueError as e:
logger.info(f"Couldn't generate did:web: {e}")
# handles must be hostnames
# https://atproto.com/specs/handle
username = obj.get('username')
parsed = urlparse(url)
domain = parsed.netloc
did_web_bare = did_web.removeprefix('did:web:')
handle = (username if username and HANDLE_RE.fullmatch(username)
else did_web_bare if ':' not in did_web_bare
else domain if domain
else '')
ret.update({
'$type': out_type,
# TODO: more specific than domain, many users will be on shared domains
'did': id if id and id.startswith('did:') else did_web,
'handle': handle,
'avatar': avatar,
'banner': banner,
# these collections aren't not in AS1, they're borrowed from ActivityPub:
# https://www.w3.org/TR/activitypub/#followers
'followersCount': obj.get('followers', {}).get('totalItems'),
'followsCount': obj.get('following', {}).get('totalItems'),
})
# WARNING: this includes a few fields that aren't in #profileViewBasic, eg
# description, followersCount, followsCount
# https://atproto.com/specs/lexicon#authority-and-control
ret = trim_nulls(ret, ignore=('did', 'handle'))
# add bot label for automated account objectTypes
if type in ('application', 'service'):
bsky_type = ret.get('$type')
if bsky_type == 'app.bsky.actor.profile':
ret['labels'] = {
'$type': 'com.atproto.label.defs#selfLabels',
'values': [{'val': BOT_LABEL}],
}
elif bsky_type.startswith('app.bsky.actor.defs#'):
ret['labels'] = [{'val': BOT_LABEL}]
elif type == 'share':
if not out_type or out_type == 'app.bsky.feed.repost':
ret = {
'$type': 'app.bsky.feed.repost',
'subject': from_as1_to_strong_ref(inner_obj, client=client, raise_=raise_),
'createdAt': from_as1_datetime(obj.get('published')),
}
elif out_type == 'app.bsky.feed.defs#reasonRepost':
ret = {
'$type': 'app.bsky.feed.defs#reasonRepost',
'by': from_as1(actor, out_type='app.bsky.actor.defs#profileViewBasic', **kwargs),
'indexedAt': from_as1_datetime(None),
}
elif out_type == 'app.bsky.feed.defs#feedViewPost':
ret = {
'$type': 'app.bsky.feed.defs#feedViewPost',
'post': from_as1(inner_obj, out_type='app.bsky.feed.defs#postView', **kwargs),
'reason': from_as1(obj, out_type='app.bsky.feed.defs#reasonRepost', **kwargs),
}
elif type == 'like':
ret = {
'$type': 'app.bsky.feed.like',
'subject': from_as1_to_strong_ref(inner_obj, client=client, raise_=raise_),
'createdAt': from_as1_datetime(obj.get('published')),
}
elif type in ('follow', 'block'):
if not inner_obj:
raise ValueError('follow activity requires actor and object')
obj_id = inner_obj.get('id')
lexicon = f'app.bsky.graph.{type}'
if match := AT_URI_RE.fullmatch(obj_id):
if match.group('collection') == 'app.bsky.graph.list':
lexicon = 'app.bsky.graph.listblock'
ret = {
'$type': lexicon,
'subject': obj_id, # DID or list at:// URI
'createdAt': from_as1_datetime(obj.get('published')),
}
elif type == 'flag':
# use the first object that we can get an ATProto DID or URI/CID for
post_ref = repo_ref = None
for inner in as1.get_objects(activity):
if inner.get('id', '').startswith('did:'):
repo_ref = {
'$type': 'com.atproto.admin.defs#repoRef',
'did': inner['id'],
}
elif not post_ref or not post_ref.get('cid'):
post_ref = {
'$type': 'com.atproto.repo.strongRef',
**from_as1_to_strong_ref(inner, client=client, raise_=raise_),
}
if not (post_ref and post_ref.get('cid')) and not repo_ref:
raise ValueError('flag activity requires at:// or did: object')
ret = {
'$type': 'com.atproto.moderation.createReport#input',
'subject': post_ref or repo_ref,
# https://github.com/bluesky-social/atproto/blob/main/lexicons/com/atproto/moderation/defs.json#
'reasonType': 'com.atproto.moderation.defs#reasonOther',
# https://github.com/bluesky-social/atproto/blob/651d4c2a3447525c68d3bf1b8492bdafb0a88c66/lexicons/com/atproto/moderation/createReport.json#L21
'reason': (obj.get('content') or summary)[:2000],
}
elif verb in ('post', 'update') and type in POST_TYPES:
# images => embeds
images_embed = images_record_embed = None
if images := as1.get_objects(obj, 'image'):
images_embed = {
'$type': 'app.bsky.embed.images#view',
'images': [],
}
for img in images[:4]:
url = img.get('url') or img.get('id')
alt = img.get('displayName') or ''
images_embed['images'].append({
'$type': 'app.bsky.embed.images#viewImage',
'thumb': url,
'fullsize': url,
'alt': alt,
})
if blob := blobs.get(url):
if not images_record_embed:
images_record_embed = {
'$type': 'app.bsky.embed.images',
'images': [],
}
image_record = {
'$type': 'app.bsky.embed.images#image',
'image': blob,
'alt': alt,
}
if aspect := aspects.get(url):
image_record['aspectRatio'] = {
'width': aspect[0],
'height': aspect[1]
}
images_record_embed['images'].append(image_record)
# first video => embed
attachments = util.get_list(obj, 'attachments')
has_audio_attachment = any(as1.object_type(att) == 'audio' for att in attachments)
video = video_embed = video_record_embed = None
video_missing_blob = False
for att in attachments:
url = util.get_url(att, 'stream')
blob = blobs.get(url)
if url and blob and as1.object_type(att) == 'video':
video = url
alt = att.get('displayName') or ''
video_embed = {
'$type': 'app.bsky.embed.video#view',
'cid': blob_cid(blob),
'playlist': '?',
'alt': alt,
}
video_record_embed = {
'$type': 'app.bsky.embed.video',
'video': blob,
'alt': alt,
}
if aspect := aspects.get(url):
video_record_embed['aspectRatio'] = {
'width': aspect[0],
'height': aspect[1]
}
break
elif as1.object_type(att) == 'video':
video_missing_blob = True
# by default, original post link goes into an external embed. Bluesky can't
# do either images or video along with an external embed in the same post
# though, https://github.com/bluesky-social/atproto/discussions/2575 , so if
# the post has those, put the original post link at the end of the text
# instead, after truncating.
original_post_embed_name = original_post_text_suffix = None
include_link = None
url = as1.get_url(obj) or obj.get('id')
if url:
snippet = f'Original post on {util.domain_from_link(url)}'
original_post_embed_name = snippet
original_post_text_suffix = f'[{snippet}]'
if has_audio_attachment:
original_post_embed_name = '[Audio] ' + original_post_embed_name
original_post_text_suffix = '[Audio] ' + original_post_text_suffix
elif video_missing_blob:
original_post_embed_name = '[Video] ' + original_post_embed_name
original_post_text_suffix = '[Video] ' + original_post_text_suffix
original_post_text_suffix = '\n\n' + original_post_text_suffix
if images or video:
include_link = INCLUDE_LINK if has_audio_attachment or video_missing_blob else INCLUDE_IF_TRUNCATED
else:
include_link = OMIT_LINK # link will be in the external embed, not text
# convert text from HTML and truncate
orig_content = obj.get('content', '')
as1.convert_html_content_to_text(obj)
as1.expand_tags(obj)
tags = util.get_list(obj, 'tags')
# handle summary. for articles, use instead of content. for notes, assume
# it's a fediverse-style content warnings, add above content.
# https://github.com/snarfed/bridgy-fed/issues/1001
full_text = content = obj.get('content') or ''
index_offset = 0
preview_record = None
if type == 'article' or as_embed:
if (preview := obj.get('preview')) and as1.object_type(preview) == 'note':
# we'll take text and facets from this below
preview_record = from_as1(preview, **kwargs)
else:
full_text = summary
tags = [] # tags are presumably in content, not summary
elif summary:
prefix = f'[{summary}]\n\n'
full_text = prefix + full_text
index_offset = len(prefix)
# attachments to embed(s), including quoted posts
record_embed = record_record_embed = external_embed = external_record_embed = None
# convert Bluesky post link at end of content to a quote
attachment_urls = [att['url'] for att in attachments if 'url' in att]
text_end = len(full_text)
for tag in tags:
# check that the link is at the end and the text is the url
if (tag.get('objectType') != 'link' or tag.get('url') != tag.get('displayName') or
tag.get('startIndex', 0) + tag.get('length', 0) + index_offset != text_end):
continue
# check the link is to a bluesky post and not already in attachments
if ((tag['url'].startswith('at://')
or ((match := BSKY_APP_URL_RE.fullmatch(tag['url']))
and match.group('type') == 'post'))
and not tag['url'] in attachment_urls):
start_index = tag['startIndex'] + index_offset
# check link is on its own line
if start_index > 0 and full_text[start_index - 1] != '\n':
continue
# create attachment
attachments.append({
'objectType' : 'note',
'url' : tag['url']
})
# remove link
full_text = full_text[:start_index - 1]
for att in attachments:
att_id = att.get('id') or ''
att_url = as1.get_url(att) or ''
if not (att.get('objectType') in ('article', 'link', 'note')
and (att_url or att_id)):
continue
if (att_id.startswith('at://') or att_id.startswith(Bluesky.BASE_URL) or
att_url.startswith('at://') or att_url.startswith(Bluesky.BASE_URL)):
# quoted Bluesky post
embed = from_as1(att, **kwargs).get('post') or {}
embed['value'] = embed.pop('record', None)
record_embed = {
'$type': f'app.bsky.embed.record#view',
'record': {
**embed,
'$type': f'app.bsky.embed.record#viewRecord',
# override these so that trim_nulls below will remove them
'likeCount': None,
'replyCount': None,
'repostCount': None,
},
}
record_record_embed = {
'$type': f'app.bsky.embed.record',
'record': from_as1_to_strong_ref(att, client=client, raise_=raise_),
}
full_text = QUOTE_RE_SUFFIX.sub('', full_text)
else:
# external link
external_record_embed = to_external_embed(att, blobs=blobs)
external_embed = {
'$type': f'app.bsky.embed.external#view',
'external': {
**external_record_embed['external'],
'$type': f'app.bsky.embed.external#viewExternal',
'thumb': util.get_first(att, 'image'),
},
}
# language(s)
# (we steal contentMap from AS2, it's not officially part of AS1)
langs = [lang for lang, lang_content in obj.get('contentMap', {}).items()
if LANG_RE.fullmatch(lang) and lang_content == orig_content]
# truncate text. if we're including a suffix, ie original post link, link
# that with a facet
is_dm = as1.is_dm(obj, actor=actor)
text = Bluesky('unused').truncate(
full_text, original_post_text_suffix, include_link=include_link,
punctuation=('', ''), type='dm' if is_dm else type, weights=False,
lang=langs[0] if langs else None)
truncated = text != full_text
if truncated:
text_byte_end = len(text.split(ELLIPSIS, maxsplit=1)[0].encode())
else:
text_byte_end = len(text.encode())
facets = []
standalone_tags = set()
if (truncated and original_post_text_suffix
and text.endswith(original_post_text_suffix)):
facets.append({
'$type': 'app.bsky.richtext.facet',
'features': [{
'$type': 'app.bsky.richtext.facet#link',
'uri': url,
}],
'index': {
'byteStart': len(text.removesuffix(original_post_text_suffix).encode()),
'byteEnd': len(text.encode()),
}
})
# convert tags to facets
hashtag_facets = set() # contains string displayNames, lower cased
for tag in tags:
name = tag.get('displayName', '').strip().lstrip('@#')
tag_type = tag.get('objectType')
if name and not tag_type:
tag_type = 'hashtag'
tag_url = tag.get('url') or ''
if not name and not tag_url:
continue
facet = {
'$type': 'app.bsky.richtext.facet',
}
try:
start = int(tag['startIndex']) + index_offset
if start and obj.get('content_is_html'):
raise NotImplementedError('HTML content is not supported with index tags')
end = start + int(tag['length'])
facet['index'] = {
# convert indices from Unicode chars to UTF-8 encoded bytes
# https://github.com/snarfed/atproto/blob/5b0c2d7dd533711c17202cd61c0e101ef3a81971/lexicons/app/bsky/richtext/facet.json#L34
'byteStart': len(full_text[:start].encode()),
'byteEnd': len(full_text[:end].encode()),
}
except (KeyError, ValueError, IndexError, TypeError):
pass
if tag_type == 'hashtag':
max_graphemes = LEXRPC.defs['app.bsky.feed.post']['record']['properties']['tags']['items']['maxGraphemes']
if len(name) >= max_graphemes:
logger.warning(f'Hashtag "{name}" longer than maxGraphemes {max_graphemes}')
continue
facet['features'] = [{
'$type': 'app.bsky.richtext.facet#tag',
'tag': name,
}]
elif tag_type == 'mention':
# extract and resolve DID
if tag_url.startswith('did:'):
user = tag_url
elif match := AT_URI_RE.fullmatch(tag_url):
user = match.group('repo')
elif match := BSKY_APP_URL_RE.fullmatch(tag_url):
user = match.group('id')
else:
user = name.lstrip('@')
did = None
if user.startswith('did:'):
did = user
elif client and HANDLE_RE.fullmatch(user):
try:
did = client.com.atproto.identity.resolveHandle(handle=user)['did']
except BaseException as e:
code, _ = util.interpret_http_exception(e)
if not code:
raise
if did:
facet['features'] = [{
'$type': 'app.bsky.richtext.facet#mention',
'did': did,
}]
if not facet.get('features') and tag_url:
if util.DOMAIN_RE.fullmatch(tag_url):
tag_url = f'https://{tag_url}/'
if util.is_url(tag_url):
facet['features'] = [{
'$type': 'app.bsky.richtext.facet#link',
'uri': tag_url,
}]
# skip or trim this facet if it's off the end of content that got truncated
index = facet.get('index')
if not index or index.get('byteStart', 0) >= text_byte_end:
max_length = LEXRPC.defs['app.bsky.feed.post']['record']['properties']['tags']['maxLength']
if tag_type == 'hashtag' and name.lower() not in hashtag_facets:
if len(standalone_tags) >= max_length:
logger.warning(f'More than {max_length} standalone hashtags, omitting "{name}"')
else:
standalone_tags.add(name)
continue
if index.get('byteEnd', 0) > text_byte_end:
index['byteEnd'] = text_byte_end
if facet.get('features') and facet not in facets:
facets.append(facet)
if tag_type == 'hashtag':
hashtag_facets.add(name.lower())
standalone_tags.discard(name.lower())
# if we truncated this post's text, override external embed with link to
# original post. (if there are images, we added a link in the text instead,
# and this won't get used.)
if (truncated or has_audio_attachment or video_missing_blob) and url:
external_record_embed = {
'$type': f'app.bsky.embed.external',
'external': {
'$type': f'app.bsky.embed.external#external',
'uri': url,
'title': original_post_embed_name,
'description': '',
},
}
external_embed = {
'$type': f'app.bsky.embed.external#view',
'external': {
**external_record_embed['external'],
'$type': f'app.bsky.embed.external#viewExternal',
'thumb': (images[0].get('url') or images[0].get('id')) if images else None,
},
}
# populate embeds
media_embed = video_embed or images_embed or external_embed
if record_embed and ():
embed = {
'$type': 'app.bsky.embed.recordWithMedia#view',
'record': record_embed,
'media': media_embed,
}
else:
embed = record_embed or media_embed
media_record_embed = video_record_embed or images_record_embed or external_record_embed
if is_dm:
# chat.bsky.convo.defs#messageInput only allows record embeds
record_embed = record_record_embed
embed = record_embed
elif record_record_embed and media_record_embed:
record_embed = {
'$type': 'app.bsky.embed.recordWithMedia',
'record': record_record_embed,
'media' : media_record_embed
}
else:
record_embed = record_record_embed or media_record_embed
# in reply to
reply = None
in_reply_to = base_object(obj)
if in_reply_to and not is_dm:
parent_ref = from_as1_to_strong_ref(in_reply_to, client=client,
value=True, raise_=raise_)
root_ref = (parent_ref.pop('value', {}).get('reply', {}).get('root')
or parent_ref)
reply = {
'$type': 'app.bsky.feed.post#replyRef',
'root': root_ref,
'parent': parent_ref,
}
# convert AS1 sensitive to "nudity" label
# https://github.com/snarfed/atproto/blob/f2f8de63b333448d87c364578e023ddbb63b8b25/lexicons/com/atproto/label/defs.json#L139-L154
# https://swicg.github.io/miscellany/#sensitive
# https://docs.joinmastodon.org/user/posting/#cw
labels = None
if obj.get('sensitive'):
label = SENSITIVE_LABEL_DEFAULT
if dynamic_sensitive_labels and summary:
summary_lower = summary.lower()
for bluesky_label, keywords in AS1_TO_BLUESKY_LABEL_MAP.items():
if any(s in summary_lower for s in keywords):
label = bluesky_label
labels = {
'$type' : 'com.atproto.label.defs#selfLabels',
'values' : [{'val' : label}],
}
ret = {
'$type': 'chat.bsky.convo.defs#messageInput' if is_dm else 'app.bsky.feed.post',
'text': text,
'createdAt': from_as1_datetime(obj.get('published')),
'embed': record_embed,
'facets': facets,
'reply': reply,
'langs': langs,
'labels': labels,
'tags': sorted(standalone_tags),
}
if preview_record and preview_record['text']:
ret.update({
'text': preview_record['text'],
'facets': preview_record.get('facets'),
'langs': preview_record.get('langs'),
'tags': preview_record.get('tags'),
})
if as_embed or (type == 'article' and url):
ret['embed'] = to_external_embed(obj, description=content or summary,
blobs=blobs)
if images_record_embed:
ret['embed']['external']['thumb'] = images_record_embed['images'][0]['image']
if original_fields_prefix:
ret.update({
f'{original_fields_prefix}OriginalText': orig_content,
f'{original_fields_prefix}OriginalUrl': url,
})
ret = trim_nulls(ret, ignore=('alt', 'createdAt', 'cid', 'description',
'text', 'title', 'uri'))
# also site.standard.document for articles
if out_type == 'site.standard.document' and not url:
raise ValueError("No url or id, can't convert to site.standard.document")
if (type == 'article' and url
and (multiple or out_type == 'site.standard.document')):
url_parts = urlparse(url)
doc = trim_nulls({
'$type': 'site.standard.document',
'site': urlunparse(url_parts[:2] + ('',) * 4),
'path': urlunparse(('', '') + url_parts[2:]),
'title': obj.get('displayName') or '',
'description': summary,
'textContent': content,
'tags': util.uniquify(
tag['displayName'].removeprefix('#')
for tag in util.get_list(obj, 'tags') if tag.get('displayName')),
'publishedAt': from_as1_datetime(obj.get('published')),
}, ignore=('title',))
if updated := obj.get('updated'):
doc['updatedAt'] = from_as1_datetime(updated)
if images_record_embed and (imgs := images_record_embed.get('images')):
doc['coverImage'] = imgs[0]['image'] # blob
if multiple:
rets = [ret, doc]
else:
ret = doc
if out_type in ('app.bsky.feed.defs#feedViewPost',
'app.bsky.feed.defs#postView'):
# author
author = as1.get_object(obj, 'author')
author.setdefault('objectType', 'person')
author = from_as1(author, out_type='app.bsky.actor.defs#profileViewBasic',
**kwargs)
ret = trim_nulls({
'$type': 'app.bsky.feed.defs#postView',
'uri': obj.get('id') or url or '',
'cid': '',
'record': ret,
'author': author,
'embed': embed,
'replyCount': 0,
'repostCount': 0,
'likeCount': 0,
'indexedAt': from_as1_datetime(None),
}, ignore=('author', 'createdAt', 'cid', 'description', 'indexedAt',
'record', 'text', 'title', 'uri'))
if out_type == 'app.bsky.feed.defs#feedViewPost':
ret = {
'$type': out_type,
'post': ret,
}
elif type == 'collection':
ret = {
'$type': 'app.bsky.graph.list',
'purpose': 'app.bsky.graph.defs#curatelist',
'name': obj.get('displayName') or obj.get('id'),
'description': summary,
'avatar': blobs.get(util.get_url(obj, 'image')),
'createdAt': from_as1_datetime(obj.get('published')),
}
elif verb == 'add':
ret = {
'$type': 'app.bsky.graph.listitem',
'subject': inner_obj.get('id'),
'list': activity.get('target'),
'createdAt': from_as1_datetime(obj.get('published')),
}
else:
raise ValueError(f'AS1 object has unknown objectType {type} verb {verb}')
# validate against lexicon schema, truncate for character limits, etc
truncated = []
for record in (rets or [ret]):
if nsid := record.get('$type'):
method = nsid.removesuffix('#input')
if method == nsid:
type = 'record'
else:
nsid = method
type = 'input'
record = LEXRPC.validate(nsid, type, record)
truncated.append(record)
return truncated if multiple else truncated[0]
[docs]
def to_external_embed(obj, description=None, blobs=None):
"""Converts an AS1 object to a Bluesky ``app.bsky.embed.external#external``.
Args:
obj (dict): AS1 object
description (str): if provided, overrides ``summary`` and ``content`` in ``obj``
Returns:
dict: Bluesky ``app.bsky.embed.external#external`` record
"""
url = as1.get_url(obj) or obj.get('id')
assert url
ret = {
'$type': f'app.bsky.embed.external',
'external': {
'$type': f'app.bsky.embed.external#external',
'uri': url,
'title': obj.get('displayName') or '', # required
'description': (description
or html_to_text(obj.get('summary') or obj.get('content'),
ignore_links=True)
or ''),
}
}
if blobs:
for img in as1.get_objects(obj, 'image'):
if img_url := img.get('url') or img.get('id'):
if blob := blobs.get(img_url):
ret['external']['thumb'] = blob
break
return ret
[docs]
def to_as1(obj, type=None, uri=None, repo_did=None, repo_handle=None,
pds=DEFAULT_PDS, client=None):
"""Converts a Bluesky object to an AS1 object.
Args:
obj (dict): ``app.bsky.*`` object
type (str): optional ``$type`` to parse with, only used if ``obj['$type']``
is unset
uri (str): optional ``at://`` URI of this object. Used to populate the
``id`` and ``url`` fields for some object types, eg posts.
repo_did (str): optional DID of the repo this object is from. Required to
generate image URLs.
repo_handle (str): optional handle of the user whose repo this object is from
pds (str): base URL of the PDS that currently serves this object's repo.
Required to generate image URLs. Defaults to ``https://bsky.social/``.
client (Bluesky or lexrpc.Client): optional; if provided, this will be used
to make API calls to PDSes to fetch secondary records needed for conversion,
eg ``community.lexicon.payments.webMonetization`` for profiles
Returns:
dict: AS1 object, or None if the record doesn't correspond to an AS1 object,
eg "not found" records
Raises:
ValueError: if the ``$type`` field is missing or unsupported
"""
if not obj:
return {}
type = obj.get('$type') or type
if not type:
raise ValueError('Bluesky object missing $type field')
uri_repo = uri_bsky_url = None
if uri:
uri_bsky_url = at_uri_to_web_url(uri)
if not uri.startswith('at://'):
raise ValueError('Expected at:// uri, got {uri}')
if parsed := AT_URI_RE.fullmatch(uri):
uri_repo = parsed.group(1)
# for nested to_as1 calls, if necessary
kwargs = {'repo_did': repo_did, 'repo_handle': repo_handle, 'pds': pds}
# TODO: once we're on Python 3.10, switch this to a match statement!
if type in ('app.bsky.actor.profile',
'app.bsky.actor.defs#profileView',
'app.bsky.actor.defs#profileViewBasic',
'app.bsky.actor.defs#profileViewDetailed',
'app.bsky.feed.generator',
):
images = [{'url': obj.get('avatar')}]
banner = obj.get('banner')
if banner:
images.append({'url': obj.get('banner'), 'objectType': 'featured'})
handle = obj.get('handle')
did = obj.get('did')
if type == 'app.bsky.actor.profile':
if not handle:
handle = repo_handle
if not did:
did = repo_did
# urls: bsky.app profile, did:web domain, bsky.app feed generator, links in bio
urls = []
if handle:
urls.append(Bluesky.user_url(handle))
if not util.domain_or_parent_in(handle, [DEFAULT_PDS_DOMAIN]):
urls.append(f'https://{handle}/')
if did and did.startswith('did:web:'):
urls.append(did_web_to_url(did))
if website := obj.get('website'):
urls.append(website)
summary = util.linkify(html.escape(obj.get('description') or ''), pretty=True)
urls.extend(util.extract_links(summary))
if type == 'app.bsky.feed.generator':
if uri_bsky_url:
urls.insert(0, uri_bsky_url)
ret = {
'objectType': 'service',
'id': uri,
'author': repo_did,
'generator': did,
}
else:
ret = {
'objectType': 'person',
'id': uri or did,
'username': obj.get('handle') or repo_handle,
}
urls = util.dedupe_urls(urls)
ret.update({
'url': urls[0] if urls else None,
'urls': urls if len(urls) > 1 else None,
'displayName': obj.get('displayName'),
# TODO: for app.bsky.feed.generator, use descriptionFacets
'summary': summary,
'image': images,
'published': obj.get('createdAt'),
})
# avatar and banner are blobs in app.bsky.actor.profile; convert to URLs
repo_did = repo_did or did
if type in ('app.bsky.actor.profile', 'app.bsky.feed.generator'):
if repo_did and pds:
for img in ret['image']:
img['url'] = blob_to_url(blob=img['url'], repo_did=repo_did, pds=pds)
else:
ret['image'] = []
# convert public view opt-out to unlisted AS1 audience targeting
# https://docs.bsky.app/docs/advanced-guides/resolving-identities#for-backend-services
# https://activitystrea.ms/specs/json/targeting/1.0/
labels = (obj.get('labels', {}).get('values', [])
if type == 'app.bsky.actor.profile'
else obj.get('labels', []))
for label in labels:
if label.get('val') == NO_UNAUTHENTICATED_LABEL and not label.get('neg'):
ret['to'] = [{
'objectType': 'group',
'alias': '@unlisted',
}]
if label.get('val') == BOT_LABEL and not label.get('neg'):
ret['objectType'] = 'service'
# follow/follower counts. convert to followers/following collections, not in AS1,
# borrowed from ActivityPub: https://www.w3.org/TR/activitypub/#followers
if followers := obj.get('followersCount'):
ret['followers'] = {'totalItems': followers}
if follows := obj.get('followsCount'):
ret['following'] = {'totalItems': follows}
# convert pinnedPost to featured ActivityStreams collection
# https://docs.joinmastodon.org/spec/activitypub/#featured
if pinned_post := obj.get('pinnedPost'):
# sometimes this is a str. not sure why, that's invalid based on the schema,
# pinnedPost has always been an object, but we still see it occasionally, eg
# https://console.cloud.google.com/errors/detail/CKevsI3gyqrAdw?project=bridgy-federated
if isinstance(pinned_post, dict):
ret['featured'] = {
'totalItems': 1,
'items': [pinned_post['uri']],
}
# Web Monetization wallet address, if any
# https://github.com/lexicon-community/lexicon/tree/main/community/lexicon/payments
if client and repo_did:
if wallet := client.com.atproto.repo.getRecord(
repo=repo_did, collection='community.lexicon.payments.webMonetization',
rkey='self'):
ret['monetization'] = wallet['value']['address']
elif type in ('app.bsky.feed.post',
'chat.bsky.convo.defs#messageInput',
'chat.bsky.convo.defs#messageView',
):
text = obj.get('text', '')
text_encoded = text.encode()
# convert facets to tags
tags = []
for facet in obj.get('facets', []):
tag = {}
for feat in facet.get('features', []):
if feat.get('$type') == 'app.bsky.richtext.facet#link':
tag.update({
'objectType': 'article',
'url': feat.get('uri'),
})
elif feat.get('$type') == 'app.bsky.richtext.facet#mention':
tag.update({
'objectType': 'mention',
'url': Bluesky.user_url(feat.get('did')),
})
elif feat.get('$type') == 'app.bsky.richtext.facet#tag':
name = feat.get('tag')
tag.update({
'objectType': 'hashtag',
'displayName': name,
'url': f'https://bsky.app/search?q=%23{urllib.parse.quote(name)}',
})
index = facet.get('index', {})
# convert indices from UTF-8 encoded bytes to Unicode chars (code points)
# https://github.com/snarfed/atproto/blob/5b0c2d7dd533711c17202cd61c0e101ef3a81971/lexicons/app/bsky/richtext/facet.json#L34
byte_start = index.get('byteStart')
byte_end = index.get('byteEnd')
try:
if byte_start is not None:
tag['startIndex'] = len(text_encoded[:byte_start].decode())
if byte_end is not None:
name = text_encoded[byte_start:byte_end].decode()
tag.setdefault('displayName', name)
tag['length'] = len(name)
except UnicodeDecodeError as e:
logger.warning(f"Couldn't apply facet {facet} to unicode text: {text}")
tags.append(tag)
# escape HTML characters: <, >, &. shuffle over facet indices to match.
while match := re.search(r'<|>', text):
# escape char
entity = f'&{html.entities.codepoint2name[ord(match.group(0))]};'
pos = match.start(0)
text = text[:pos] + entity + text[pos + 1:]
# no need to update text_encoded, we're done with it by here
# shuffle indices
added = len(entity) - 1
for tag in tags:
start = tag.get('startIndex', 0)
end = start + tag.get('length', 0) # exclusive
if end <= pos:
continue
elif start <= pos:
tag['length'] += added
else: # start > pos
tag['startIndex'] += added
# extra tags not in post text
tags.extend({
'objectType': 'hashtag',
'displayName': name,
'url': f'https://bsky.app/search?q=%23{urllib.parse.quote(name)}',
} for name in obj.get('tags', []))
in_reply_to = obj.get('reply', {}).get('parent', {}).get('uri')
# convert self labels to AS1 sensitive and content warning in summary
# https://github.com/snarfed/atproto/blob/f2f8de63b333448d87c364578e023ddbb63b8b25/lexicons/com/atproto/label/defs.json#L139-L154
# https://swicg.github.io/miscellany/#sensitive
content_warnings = []
sensitive = None
for label in obj.get('labels', {}).get('values', []):
label_text = SENSITIVE_LABELS.get(label.get('val'))
if label_text and not label.get('neg'):
sensitive = True
content_warnings.append(label_text)
ret = {
'objectType': 'comment' if in_reply_to else 'note',
'id': uri,
'url': at_uri_to_web_url(uri),
'content': text,
'summary': '<br>'.join(content_warnings),
'inReplyTo': [{
'id': in_reply_to,
'url': at_uri_to_web_url(in_reply_to),
}],
'published': obj.get('createdAt', ''),
'tags': tags,
'author': repo_did,
# we steal this from AS2, it's not officially part of AS1
'contentMap': {lang: text for lang in obj.get('langs', [])},
'sensitive': sensitive,
}
if type == 'chat.bsky.convo.defs#messageView':
author = obj['sender']['did']
if not ret['id']:
ret['id'] = f'at://{author}/chat.bsky.convo.defs.messageView/{obj["id"]}'
ret.update({
'author': author,
'published': obj.get('sentAt'),
'to': ['?'], # show that it's a DM even though we don't know the recipient
})
# embeds
embed = obj.get('embed') or {}
embed_type = embed.get('$type')
if embed_type == 'app.bsky.embed.images':
ret['image'] = to_as1(embed, **kwargs)
elif embed_type in ('app.bsky.embed.external',
'app.bsky.embed.record',
'app.bsky.embed.video',
):
ret['attachments'] = [to_as1(embed, **kwargs)]
elif embed_type == 'app.bsky.embed.recordWithMedia':
ret['attachments'] = [to_as1(embed, **kwargs)]
# TODO: stop reaching inside and do this in the to_as1 call instead?
# need to make it return both the quote post attachment and the image.
media = embed.get('media')
media_type = media.get('$type')
if media_type in ('app.bsky.embed.external', 'app.bsky.embed.video'):
ret['attachments'].append(to_as1(media, **kwargs))
elif media_type == 'app.bsky.embed.images':
ret['image'] = to_as1(media, **kwargs)
else:
assert False, f'Unknown embed media type: {media_type}'
elif type in ('app.bsky.feed.defs#postView', 'app.bsky.embed.record#viewRecord'):
ret = to_as1(obj.get('record') or obj.get('value'), **kwargs)
author = obj.get('author') or {}
uri = obj.get('uri')
kwargs['repo_did'] = obj.get('author', {}).get('did') or repo_did
ret.update({
'id': uri,
'url': (at_uri_to_web_url(uri, handle=author.get('handle'))
if uri.startswith('at://') else None),
'author': to_as1(author, type='app.bsky.actor.defs#profileViewBasic', **kwargs),
})
# convert embeds to attachments
for embed in util.get_list(obj, 'embeds') + util.get_list(obj, 'embed'):
embed_type = embed.get('$type')
if embed_type == 'app.bsky.embed.images#view':
ret.setdefault('image', []).extend(to_as1(embed, **kwargs))
elif embed_type in ('app.bsky.embed.external#view',
'app.bsky.embed.record#view',
'app.bsky.embed.video#view',
):
ret['attachments'] = [to_as1(embed, **kwargs)]
elif embed_type == 'app.bsky.embed.recordWithMedia#view':
ret['attachments'] = [to_as1(embed.get('record', {}).get('record'), **kwargs)]
media = embed.get('media')
media_type = media.get('$type')
if media_type in ('app.bsky.embed.external#view', 'app.bsky.embed.video#view'):
ret.setdefault('attachments', []).append(to_as1(media, **kwargs))
elif media_type == 'app.bsky.embed.images#view':
ret.setdefault('image', []).extend(to_as1(media, **kwargs))
else:
assert False, f'Unknown embed media type: {media_type}'
elif type == 'app.bsky.embed.images':
ret = []
if repo_did and pds:
for img in obj.get('images', []):
image = img.get('image')
if image:
url = blob_to_url(blob=image, repo_did=repo_did, pds=pds)
ret.append({'url': url, 'displayName': img['alt']}
if img.get('alt') else url)
elif type == 'app.bsky.embed.video':
ret = {}
if repo_did and pds:
if vid := obj['video']:
url = blob_to_url(blob=vid, repo_did=repo_did, pds=pds)
ret = {
'objectType': 'video',
'stream': {
'url': url,
'mimeType': vid.get('mimeType'),
},
'displayName': obj.get('alt'),
}
elif type == 'app.bsky.embed.images#view':
ret = [{
'url': img.get('fullsize'),
'displayName': img.get('alt'),
} for img in obj.get('images', [])]
elif type == 'app.bsky.embed.video#view':
ret = {}
if repo_did and pds:
ret = {
'objectType': 'video',
'stream': {'url': blob_to_url(blob=obj, repo_did=repo_did, pds=pds)},
'displayName': obj.get('alt'),
}
elif type in ('app.bsky.embed.external', 'app.bsky.embed.external#view'):
ret = to_as1(obj.get('external'), type='app.bsky.embed.external#viewExternal',
**kwargs)
elif type in ('app.bsky.embed.external#external',
'app.bsky.embed.external#viewExternal'):
ret = {
'objectType': 'link',
'url': obj.get('uri'),
'displayName': obj.get('title'),
'summary': obj.get('description'),
}
thumb = obj.get('thumb')
if type == 'app.bsky.embed.external#external':
if repo_did and pds:
ret['image'] = blob_to_url(blob=thumb, repo_did=repo_did, pds=pds)
else:
ret['image'] = thumb
elif type == 'app.bsky.embed.record':
at_uri = to_as1(obj.get('record'), type='com.atproto.repo.strongRef', **kwargs)
if not at_uri or not isinstance(at_uri, str):
return None
ret = {
'objectType': 'note',
'id': at_uri,
'url': at_uri_to_web_url(at_uri),
}
elif type == 'app.bsky.embed.recordWithMedia':
ret = to_as1(obj.get('record'), type='app.bsky.embed.record', **kwargs)
# TODO: move media handling here from app.bsky.feed.post embed block above
elif type == 'app.bsky.embed.record#view':
return to_as1(obj.get('record'), type='app.bsky.embed.record', **kwargs)
elif type in ('app.bsky.embed.record#viewNotFound',
'app.bsky.embed.record#viewBlocked',
'app.bsky.feed.defs#notFoundPost',
):
return None
elif type == 'app.bsky.feed.defs#feedViewPost':
ret = to_as1(obj['post'], type='app.bsky.feed.defs#postView', **kwargs)
reason = obj.get('reason')
if reason and reason.get('$type') == 'app.bsky.feed.defs#reasonRepost':
post = obj.get('post', {})
uri = post.get('viewer', {}).get('repost')
kwargs['repo_did'] = post.get('author', {}).get('did') or repo_did
ret = {
'objectType': 'activity',
'verb': 'share',
'id': uri,
'url': at_uri_to_web_url(uri),
'object': ret,
'actor': to_as1(reason.get('by'),
type='app.bsky.actor.defs#profileViewBasic', **kwargs),
}
elif type in ('app.bsky.graph.follow', 'app.bsky.graph.block',
'app.bsky.graph.listblock'):
ret = {
'objectType': 'activity',
'verb': 'follow' if type == 'app.bsky.graph.follow' else 'block',
'id': uri,
'url': at_uri_to_web_url(uri),
'object': obj.get('subject'),
'actor': repo_did,
}
elif type == 'com.atproto.moderation.createReport#input':
content = obj['reasonType'].removeprefix('com.atproto.moderation.defs#reason')
if reason := obj.get('reason'):
content += f': {reason}'
ret = {
'objectType': 'activity',
'verb': 'flag',
'actor': repo_did,
'object': to_as1(obj['subject']),
'content': content,
}
elif type == 'app.bsky.feed.like':
subject = obj.get('subject', {}).get('uri')
ret = {
'objectType': 'activity',
'verb': 'like',
'id': uri,
'object': subject,
'actor': repo_did,
}
if subject and uri_repo:
if web_url := at_uri_to_web_url(subject):
# synthetic fragment
ret['url'] = f'{web_url}#liked_by_{uri_repo}'
elif type == 'app.bsky.feed.repost':
subject = obj.get('subject', {}).get('uri')
ret = {
'objectType': 'activity',
'verb': 'share',
'id': uri,
'object': subject,
'actor': repo_did,
'published': obj.get('createdAt'),
}
if subject and uri_repo:
if web_url := at_uri_to_web_url(subject):
# synthetic fragment
ret['url'] = f'{web_url}#reposted_by_{uri_repo}'
elif type == 'app.bsky.feed.defs#threadViewPost':
post = obj.get('post', {})
kwargs['repo_did'] = post.get('author', {}).get('did') or repo_did
return to_as1(post, type='app.bsky.feed.defs#postView', **kwargs)
elif type in ('app.bsky.feed.defs#generatorView',
'app.bsky.graph.defs#listView'):
uri = obj.get('uri')
ret = {
'objectType': 'service',
'id': uri,
'url': at_uri_to_web_url(uri),
'displayName': (obj.get('displayName') # generator
or obj.get('name')), # list
'summary': obj.get('description'),
'image': obj.get('avatar'),
'author': to_as1(obj.get('creator'), type='app.bsky.actor.defs#profileView',
**kwargs),
}
elif type == 'app.bsky.feed.defs#blockedPost':
uri = obj.get('uri') or uri
ret = {
'objectType': 'note',
'id': uri,
'url': at_uri_to_web_url(uri),
'blocked': True,
'author': obj.get('blockedAuthor', {}).get('did'),
}
elif type == 'com.atproto.repo.strongRef':
return obj['uri']
elif type == 'com.atproto.admin.defs#repoRef':
return obj['did']
elif type == 'app.bsky.graph.list':
ret = {
'objectType': 'collection',
'id': uri,
'url': uri_bsky_url,
'displayName': obj.get('name'),
'summary': obj.get('description'),
'published': obj.get('createdAt'),
}
if repo_did and pds:
ret['image'] = blob_to_url(blob=obj.get('avatar'), repo_did=repo_did, pds=pds)
elif type == 'app.bsky.graph.listitem':
ret = {
'objectType': 'activity',
'verb': 'add',
'id': uri,
'actor': repo_did,
'object': obj.get('subject'),
'target': obj.get('list'),
'published': obj.get('createdAt'),
}
elif type == 'community.lexicon.payments.webMonetization':
# not a real AS1 object, just a stub so that we minimally support these records
# https://github.com/lexicon-community/lexicon/tree/main/community/lexicon/payments
ret = {
'monetization': obj.get('address'),
}
elif type == 'site.standard.document':
ret = {
'objectType': 'article',
'displayName': obj.get('title'),
'summary': obj.get('description'),
'content': obj.get('textContent'),
'published': obj.get('publishedAt'),
'updated': obj.get('updatedAt'),
'tags': [{'objectType': 'hashtag', 'displayName': tag}
for tag in obj.get('tags', [])],
}
if site := obj.get('site'):
ret['url'] = urllib.parse.urljoin(site, obj.get('path'))
if client and (post_uri := obj.get('bskyPostRef', {}).get('uri')):
if (parsed := AT_URI_RE.fullmatch(post_uri)) and parsed.lastgroup == 'rkey':
# note that bskyPostRef is a strongRef, with cid, but we don't pass that cid
# to getRecord, since if the post record has been updated, the repo may no
# longer have the version with this cid. (plus, we probably *want* the
# updated version anyway.)
if record := client.com.atproto.repo.getRecord(**parsed.groupdict()):
ret['preview'] = to_as1(record['value'], uri=post_uri,
repo_did=parsed['repo'], pds=pds)
elif type == 'site.standard.publication':
ret = {
# should this be organization or service or group instead?
'objectType': 'person',
'url': obj.get('url'),
'displayName': obj.get('name'),
'summary': obj.get('description'),
}
else:
raise ValueError(f'Bluesky object has unknown $type: {type}')
ret = trim_nulls(ret)
# ugly hack
if isinstance(ret, dict) and ret.get('author') == {'objectType': 'person'}:
del ret['author']
return ret
[docs]
def blob_to_url(*, blob, repo_did, pds=DEFAULT_PDS):
"""Generates a URL for a blob.
Supports both new and old style blobs:
https://atproto.com/specs/data-model#blob-type
Supports ``ref`` values as raw bytes, base-32 encoded strings, and
DAG-JSON mappings with inner ``$link`` values.
The resulting URL is a ``com.atproto.sync.getBlob`` XRPC call to the PDS.
For blobs on the official ``bsky.social`` PDS, we could consider using their CDN
instead: ``https://av-cdn.bsky.app/img/avatar/plain/[DID]/[CID]@jpeg``
They also run a resizing image proxy on ``cdn.bsky.social`` with URLs like
``https://cdn.bsky.social/imgproxy/[SIG]/rs:fit:2000:2000:1:0/plain/[CID]@jpeg``,
not sure how to generate signatures for it yet.
Args:
blob (dict): https://atproto.com/specs/data-model#blob-type
repo_did (str): DID of the repo that owns this blob
pds (str): base URL of the PDS that serves this repo. Defaults to
:const:`DEFAULT_PDS`
Returns:
str: URL for this blob, or None if ``blob`` is empty or has no CID
"""
if not blob:
return None
assert repo_did and pds
if cid := blob_cid(blob):
path = f'/xrpc/com.atproto.sync.getBlob?did={repo_did}&cid={cid}'
return urllib.parse.urljoin(pds, path)
[docs]
def blob_cid(blob):
"""Returns a blob's base32-encoded CID.
Supports both new and old style blobs:
https://atproto.com/specs/data-model#blob-type
Supports ``ref`` values as raw bytes, base-32 encoded strings, and
DAG-JSON mappings with inner ``$link`` values.
Args:
blob (dict): https://atproto.com/specs/data-model#blob-type
Returns:
str: this blob's CID, base32-encoded
"""
assert blob
if ref := blob.get('ref'):
return (ref if isinstance(ref, str)
else CID.decode(ref).encode('base32') if isinstance(ref, bytes)
else ref.encode('base32') if isinstance(ref, CID)
else ref.get('$link'))
else:
return blob.get('cid')
[docs]
class Bluesky(Source):
"""Bluesky source class. See file docstring and :class:`Source` class for
details.
Attributes:
handle (str)
did (str)
client (lexrpc.Client)
"""
DOMAIN = 'bsky.app'
BASE_URL = 'https://bsky.app'
NAME = 'Bluesky'
TRUNCATE_TEXT_LENGTH = None # different for post text vs profile description
POST_ID_RE = AT_URI_RE
TYPE_LABELS = {
'post': 'post',
'comment': 'reply',
'repost': 'repost',
'like': 'like',
'follow': 'follow',
}
_client = None
_app_password = None
def __init__(self, handle, pds_url=None, did=None, access_token=None,
refresh_token=None, app_password=None, auth=None,
session_callback=None, **requests_kwargs):
"""Constructor.
Args:
handle (str): username, eg ``snarfed.bsky.social`` or ``snarfed.org``
pds_url (str): base URL for the user's PDS, eg ``https://my.pds/``
did (str): did:plc or did:web, optional
access_token (str): optional
refresh_token (str): optional
app_password (str): optional
auth (requests.auth.AuthBase): optional, used to authenticate XRPC requests
session_callback (callable, dict => None): passed to :class:`lexrpc.Client`
constructor, called when a new session is created or refreshed
requests_kwargs (dict): passed to :func:`requests.get`/:func:`requests.post`
"""
assert not ((access_token or refresh_token) and auth)
self.handle = handle
self.did = did
self._app_password = app_password
headers = {'User-Agent': util.user_agent}
self._client = Client(pds_url, access_token=access_token,
refresh_token=refresh_token, auth=auth, headers=headers,
session_callback=session_callback, validate=True,
requests_session=util.session, **requests_kwargs)
self._appview = Client(DEFAULT_APPVIEW, headers=headers, validate=True,
requests_session=util.session, **requests_kwargs)
[docs]
@classmethod
def from_auth(cls, auth_entity, client_metadata=None, **kwargs):
"""Creates a :class:`Bluesky` from a :class:`oauth_dropins.bluesky.BlueskyAuth`.
TODO: unify with :meth:`oauth_dropins.bluesky.BlueskyAuth.oauth_api`?
Args:
auth_entity (oauth_dropins.bluesky.BlueskyAuth)
client_metadata (dict): Bluesky OAuth client metadata. Required for OAuth
users (those with :attr:`~oauth_dropins.bluesky.BlueskyAuth.dpop_token`
set). See :func:`oauth_dropins.bluesky.bluesky_auth_kwargs`.
kwargs: passed to :class:`Bluesky`
Returns:
Bluesky:
"""
pds_url = auth_entity.pds_url or oauth_bluesky.pds_for_did(auth_entity.key.id())
if auth_entity.dpop_token:
oauth_client = oauth_bluesky.oauth_client_for_pds(client_metadata, pds_url)
token = TokenSerializer().loads(auth_entity.dpop_token)
kwargs.setdefault('auth', OAuth2AccessTokenAuth(client=oauth_client,
token=token))
else:
kwargs.setdefault('app_password', auth_entity.password)
if auth_entity.session:
kwargs.setdefault('access_token', auth_entity.session.get('accessJwt'))
kwargs.setdefault('refresh_token', auth_entity.session.get('refreshJwt'))
return cls(
handle=auth_entity.user_display_name(),
did=auth_entity.key.id(),
pds_url=pds_url,
session_callback=oauth_bluesky.make_session_callback(auth_entity),
**kwargs,
)
@property
def client(self):
if not self._client.session and self._app_password:
# log in
resp = self._client.com.atproto.server.createSession({
'identifier': self.did or self.handle,
'password': self._app_password,
})
self.handle = resp['handle']
self.did = resp['did']
return self._client
[docs]
@classmethod
def user_url(cls, handle):
"""Returns the profile URL for a given handle.
Args:
handle (str)
Returns:
str: profile URL
"""
return f'{cls.BASE_URL}/profile/{handle.lstrip("@")}'
[docs]
@classmethod
def to_as1_actor(cls, user, **kwargs):
"""Converts a user to an actor.
Args:
user (dict): an ``app.bsky.actor.profile`` record
kwargs: passed through to :func:`to_as1`
Returns:
dict: ActivityStreams actor
"""
return to_as1(user, **kwargs)
user_to_actor = to_as1_actor
"""Deprecated! Use :meth:`to_as1_actor` instead."""
[docs]
@classmethod
def post_url(cls, handle, tid):
"""Returns the post URL for a given handle and tid.
Args:
handle (str)
tid (str)
Returns:
str: profile URL
"""
return f'{cls.user_url(handle)}/post/{tid}'
[docs]
@classmethod
def post_id(cls, url):
"""Returns the `at://` URI for the given URL if it's for a post.
Also see ``arroba.util.parse_at_uri``.
Returns:
str or None
"""
if not url:
return None
if not AT_URI_RE.fullmatch(url):
try:
url = web_url_to_at_uri(url)
except ValueError:
return None
if (len(url.removeprefix('at://').split('/')) == 3
# only return at:// URIs for posts, not profiles
and not url.endswith('/app.bsky.actor.profile/self')):
return url
[docs]
def get_activities_response(self, user_id=None, group_id=None, app_id=None,
activity_id=None, fetch_replies=False,
fetch_likes=False, fetch_shares=False,
include_shares=True, fetch_mentions=False,
search_query=None, start_index=None, count=None,
cache=None, **kwargs):
"""Fetches posts and converts them to AS1 activities.
See :meth:`Source.get_activities_response` for more information.
Bluesky-specific details:
Args:
activity_id (str): an ``at://`` URI
"""
assert not start_index
params = {}
if count is not None:
params['limit'] = count
posts = None
handle = self.handle
if activity_id:
if not activity_id.startswith('at://'):
raise ValueError(f'Expected activity_id to be at:// URI; got {activity_id}')
resp = self.client.app.bsky.feed.getPostThread({}, uri=activity_id, depth=1)
posts = [resp.get('thread', {})]
elif group_id in (None, FRIENDS):
resp = self.client.app.bsky.feed.getTimeline({}, **params)
posts = resp.get('feed', [])
else: # eg group_id SELF
actor = user_id or self.did or self.handle
if not actor:
raise ValueError('user_id is required')
resp = self.client.app.bsky.feed.getAuthorFeed({}, actor=actor, **params)
posts = resp.get('feed', [])
if cache is None:
# for convenience, throwaway object just for this method
cache = {}
activities = []
for post in posts:
reason = post.get('reason')
is_repost = reason and reason.get('$type') == 'app.bsky.feed.defs#reasonRepost'
if is_repost and not include_shares:
continue
activity = self.postprocess_activity(self._post_to_activity(post))
if not activity:
continue
activities.append(activity)
obj = activity['object']
id = obj.get('id')
tags = obj.setdefault('tags', [])
if is_repost:
# If it's a repost we're not interested in responses to it.
continue
bs_post = post.get('post')
if bs_post and id:
# Likes
like_count = bs_post.get('likeCount')
if fetch_likes and like_count and like_count != cache.get('ABL ' + id):
likers = self.client.app.bsky.feed.getLikes({}, uri=bs_post.get('uri'))
tags.extend(self._make_like(bs_post, l.get('actor')) for l in likers.get('likes'))
cache['ABL ' + id] = like_count
# Reposts
repost_count = bs_post.get('repostCount')
if fetch_shares and repost_count and repost_count != cache.get('ABRP ' + id):
reposters = self.client.app.bsky.feed.getRepostedBy({}, uri=bs_post.get('uri'))
tags.extend(self._make_share(bs_post, r) for r in reposters.get('repostedBy'))
cache['ABRP ' + id] = repost_count
# Replies
reply_count = bs_post.get('replyCount')
if fetch_replies and reply_count and reply_count != cache.get('ABR ' + id):
replies = []
for r in self._get_replies(bs_post.get('uri')):
try:
reply = to_as1(r)
except ValueError as e:
logger.warning(f'skipping a reply: {e}')
continue
if reply:
replies.append(reply)
for r in replies:
r['id'] = self.tag_uri(r['id'])
obj['replies'] = {
'items': replies,
}
cache['ABR ' + id] = reply_count
resp = self.make_activities_base_response(util.trim_nulls(activities))
return resp
[docs]
def get_actor(self, user_id=None):
"""Fetches and returns a user.
Args:
user_id (str): either handle or DID; defaults to current user
Returns:
dict: ActivityStreams actor
"""
did = handle = None
if user_id:
if user_id.startswith('did:'):
did = user_id
else:
handle = user_id
else:
did = self.did
handle = self.handle
profile = self.client.app.bsky.actor.getProfile({}, actor=did or handle)
return to_as1(profile, type='app.bsky.actor.defs#profileViewDetailed',
repo_did=did, repo_handle=handle)
def _post_to_activity(self, post):
"""Converts a post to an activity.
Returns ``{}`` if ``post`` has an unknown or unsupported type.
Args:
post: Bluesky post app.bluesky.feed.defs#feedViewPost
Returns: AS1 activity
"""
try:
obj = to_as1(post, type='app.bsky.feed.defs#feedViewPost')
except ValueError as e:
logger.warning(e)
return {}
if obj.get('objectType') == 'activity':
return obj
return {
'id': obj.get('id'),
'verb': 'post',
'actor': obj.get('author'),
'object': obj,
'objectType': 'activity',
}
def _make_like(self, post, actor):
return self._make_like_or_share(post, actor, 'like')
def _make_share(self, post, actor):
return self._make_like_or_share(post, actor, 'share')
def _make_like_or_share(self, post, actor, verb):
"""Generates and returns a ActivityStreams like object.
Args:
post: dict, Bluesky app.bsky.feed.defs#feedViewPost
actor: dict, Bluesky app.bsky.actor.defs#profileView
verb: string, 'like' or 'share'
Returns: dict, AS1 like activity
"""
assert verb in ('like', 'share')
label = 'liked' if verb == 'like' else 'reposted'
url = at_uri_to_web_url(post.get('uri'), post.get('author').get('handle'))
actor_id = actor.get('did')
author = to_as1(actor, 'app.bsky.actor.defs#profileView')
author['id'] = self.tag_uri(author['id'])
suffix = f'{label}_by_{actor_id}'
return {
'id': self.tag_uri(f'{post.get("uri")}_{suffix}'),
'url': f'{url}#{suffix}',
'objectType': 'activity',
'verb': verb,
'object': {'url': url},
'author': author,
}
def _get_replies(self, uri):
"""
Gets the replies to a specific post and returns them
in ascending order of creation. Does not include the original post.
Args:
uri: string, post uri
Returns: list, Bluesky app.bsky.feed.defs#threadViewPost
"""
ret = []
resp = self.client.app.bsky.feed.getPostThread({}, uri=uri)
thread = resp.get('thread')
if thread:
ret = self._recurse_replies(thread)
return sorted(ret, key = lambda thread:
thread.get('post', {}).get('record', {}).get('createdAt') or '')
# TODO this ought to take a depth limit. hellthread, anyone?
def _recurse_replies(self, thread):
"""
Recurses through a Bluesky app.bsky.feed.defs#threadViewPost
and returns its replies as a list.
Args:
thread: dict, Bluesky app.bsky.feed.defs#threadViewPost
Returns: list, Bluesky app.bsky.feed.defs#threadViewPost
"""
ret = []
for r in thread.get('replies', []):
ret += [r]
ret += self._recurse_replies(r)
return ret
[docs]
def get_followers(self, user_id=None):
"""Returns the current user's followers.
Limited to the first 10k followers.
Args:
user_id (str): the DID to fetch followers for. If unset, defaults to
``self.user_id``.
Returns:
sequence of dict: ActivityStreams 1 actors
"""
return self._get_follows_or_followers('app.bsky.graph.getFollowers',
user_id=user_id)
[docs]
def get_follows(self, user_id=None):
"""Returns the current user's follows.
Limited to the first 10k follows.
Args:
user_id (str): the DID to fetch follows for. If unset, defaults to
``self.user_id``.
Returns:
sequence of dict: ActivityStreams 1 actors
"""
return self._get_follows_or_followers('app.bsky.graph.getFollows', user_id=user_id)
def _get_follows_or_followers(self, method, user_id=None):
"""Returns the current user's follows.
This will often be limited, eg to the first 10k followers,
depending on the silo.
Args:
method (str): either ``app.bsky.graph.getFollows`
or ``app.bsky.graph.getFollowers``
user_id (str): the user to fetch follows for. If unset, defaults to
``self.user_id``.
Returns:
sequence of dict: ActivityStreams 1 actors
"""
assert method in ('app.bsky.graph.getFollows', 'app.bsky.graph.getFollowers'), \
method
follows = []
cursor = None
while True:
max = LEXRPC.defs[method]['parameters']['properties']['limit']['maximum']
resp = self._appview.call(method, {}, actor=(user_id or self.did),
cursor=cursor, limit=max)
follows.extend(self.to_as1_actor(f, type='app.bsky.actor.defs#profileView')
for f in (resp.get('follows') or resp.get('followers') or []))
cursor = resp.get('cursor')
if not cursor or len(follows) >= MAX_FOLLOWS:
return follows[:MAX_FOLLOWS]
[docs]
def create(self, obj, include_link=OMIT_LINK, ignore_formatting=False):
"""Creates a post, reply, repost, or like.
Args:
obj: ActivityStreams object
include_link (str)
ignore_formatting (bool)
Returns:
CreationResult: whose content will be a dict with ``id``, ``url``, and
``type`` keys (all optional) for the newly created object (or None)
"""
return self._create(obj, preview=False, include_link=include_link,
ignore_formatting=ignore_formatting)
[docs]
def preview_create(self, obj, include_link=OMIT_LINK,
ignore_formatting=False):
"""Preview creating a post, reply, repost, or like.
Args:
obj: ActivityStreams object
include_link (str)
ignore_formatting (bool)
Returns:
CreationResult: content will be a str HTML snippet or None
"""
return self._create(obj, preview=True, include_link=include_link,
ignore_formatting=ignore_formatting)
def _create(self, obj, preview=None, include_link=OMIT_LINK,
ignore_formatting=False):
assert preview in (False, True)
assert self.did
type = obj.get('objectType')
verb = obj.get('verb')
try:
base_obj = self.base_object(obj)
except ValueError as e:
e_str = str(e)
return creation_result(abort=True, error_plain=e_str,
error_html=util.linkify(e_str, pretty=True))
base_id = base_obj.get('id')
base_url = base_obj.get('url')
is_reply = type == 'comment' or obj.get('inReplyTo')
atts = obj.get('attachments', [])
images = util.dedupe_urls(util.get_list(obj, 'image') +
[a for a in atts if a.get('objectType') == 'image'])
has_media = images and (type in ('note', 'article') or is_reply)
# prefer displayName over content for articles
#
# TODO: handle activities as well as objects? ie pull out ['object'] here if
# necessary?
prefer_content = type == 'note' or (base_url and is_reply)
preview_description = ''
content = self._content_for_create(
obj, ignore_formatting=ignore_formatting, prefer_name=not prefer_content)
if not content:
if type == 'activity':
content = verb
elif has_media:
content = ''
else:
return creation_result(
abort=False, # keep looking for things to publish,
error_plain='No content text found.',
error_html='No content text found.')
# truncate and ellipsize content if necessary
url = obj.get('url')
post_label = f"{self.NAME} {self.TYPE_LABELS['post']}"
if type == 'activity' and verb == 'like':
if not base_url:
return creation_result(
abort=True,
error_plain=f"Could not find a {post_label} to {self.TYPE_LABELS['like']}.",
error_html=f"Could not find a {post_label} to <a href=\"http://indiewebcamp.com/like\">{self.TYPE_LABELS['like']}</a>. Check that your post has the right <a href=\"http://indiewebcamp.com/like\">u-like-of link</a>.")
if preview:
preview_description += f"<span class=\"verb\">{self.TYPE_LABELS['like']}</span> <a href=\"{base_url}\">this {self.TYPE_LABELS['post']}</a>."
return creation_result(description=preview_description)
else:
like_atp = from_as1(obj, client=self)
result = self.client.com.atproto.repo.createRecord({
'repo': self.did,
'collection': like_atp['$type'],
'record': like_atp,
})
return creation_result({
'id': result['uri'],
'url': at_uri_to_web_url(like_atp['subject']['uri']) + '/liked-by'
})
elif type == 'activity' and verb == 'share':
if not base_url:
return creation_result(
abort=True,
error_plain=f"Could not find a {post_label} to {self.TYPE_LABELS['repost']}.",
error_html=f"Could not find a {post_label} to <a href=\"http://indiewebcamp.com/repost\">{self.TYPE_LABELS['repost']}</a>. Check that your post has the right <a href=\"http://indiewebcamp.com/repost\">repost-of</a> link.")
if preview:
preview_description += f"<span class=\"verb\">{self.TYPE_LABELS['repost']}</span> <a href=\"{base_url}\">this {self.TYPE_LABELS['post']}</a>."
return creation_result(description=preview_description)
else:
repost_atp = from_as1(obj, client=self)
result = self.client.com.atproto.repo.createRecord({
'repo': self.did,
'collection': repost_atp['$type'],
'record': repost_atp,
})
return creation_result({
'id': result['uri'],
'url': at_uri_to_web_url(repost_atp['subject']['uri']) + '/reposted-by'
})
elif type == 'activity' and verb == 'follow':
if not base_id:
return creation_result(
abort=True,
error_plain=f"Could not find a user to follow.",
error_html=f"Could not find a user to <a href=\"http://indiewebcamp.com/follow\">follow</a>. Check that your post has the right <a href=\"http://indiewebcamp.com/follow\">u-follow-of link</a>.")
if preview:
preview_description += f"<span class=\"verb\">follow</span> <a href=\"{base_url}\">this user</a>."
return creation_result(description=preview_description)
else:
follow_atp = from_as1(obj, client=self)
result = self.client.com.atproto.repo.createRecord({
'repo': self.did,
'collection': follow_atp['$type'],
'record': follow_atp,
})
return creation_result({
'id': result['uri'],
'url': base_url + '/followers'
})
elif (type in as1.POST_TYPES or is_reply or
(type == 'activity' and verb == 'post')): # probably a bookmark
# TODO: add bookmarked URL and facet
# tricky because we only want to do that if it's not truncated away
content = self.truncate(content, url, include_link=include_link, type='note')
# TODO linkify mentions and hashtags
preview_content = util.linkify(content, pretty=True, skip_bare_cc_tlds=True)
data = {'status': content}
if is_reply and base_url:
preview_description += f"<span class=\"verb\">{self.TYPE_LABELS['comment']}</span> to <a href=\"{base_url}\">this {self.TYPE_LABELS['post']}</a>:"
data['in_reply_to_id'] = base_id
else:
preview_description += f"<span class=\"verb\">{self.TYPE_LABELS['post']}</span>:"
max_images = LEXRPC.defs['app.bsky.embed.images']['properties']['images']['maxLength']
if len(images) > max_images:
images = images[:max_images]
logger.warning(f'Found {len(images)} images! Only using the first {max_images}: {images!r}')
if preview:
media_previews = [
f"<img src=\"{util.get_url(img)}\" alt=\"{util.parse_html(img.get('displayName') or '').get_text(' ', strip=True)}\" />"
for img in images
]
if media_previews:
preview_content += '<br /><br />' + ' '.join(media_previews)
return creation_result(content=preview_content,
description=preview_description)
else:
blobs, aspects = self.upload_media(images)
post_atp = from_as1(obj, blobs=blobs, aspects=aspects, client=self)
post_atp['text'] = content
# facet for link to original post, if any
if url:
url_index = content.rfind(url)
if url_index != -1:
byte_start = len(content[:url_index].encode())
post_atp.setdefault('facets', []).append({
'$type': 'app.bsky.richtext.facet',
'features': [{
'$type': 'app.bsky.richtext.facet#link',
'uri': url,
}],
'index': {
'byteStart': byte_start,
'byteEnd': byte_start + len(url.encode()),
},
})
result = self.client.com.atproto.repo.createRecord({
'repo': self.did,
'collection': post_atp['$type'],
'record': post_atp,
})
return creation_result({
'id': result['uri'],
'url': at_uri_to_web_url(result['uri'], handle=self.handle),
})
return creation_result(
abort=False,
error_plain=f'Cannot publish type={type}, verb={verb} to Bluesky',
error_html=f'Cannot publish type={type}, verb={verb} to Bluesky')
[docs]
def delete(self, at_uri):
"""Deletes a record.
Args:
at_uri (str): at:// URI of record delete
Returns:
CreationResult: content is dict with ``url`` and ``id`` fields
"""
match = AT_URI_RE.fullmatch(at_uri)
if not match:
raise ValueError(f'Expected at:// URI, got {at_uri}')
self.client.com.atproto.repo.deleteRecord({
'repo': match['repo'],
'collection': match['collection'],
'rkey': match['rkey'],
})
return creation_result({
'id': at_uri,
'url': at_uri_to_web_url(at_uri),
})
[docs]
def preview_delete(self, at_uri):
"""Previews deleting a record.
Args:
at_uri (str): at:// URI of record delete
Returns:
CreationResult:
"""
url = at_uri_to_web_url(at_uri)
return creation_result(description=f'<span class="verb">delete</span> <a href="{url}">this</a>.')
def base_object(self, obj):
return base_object(obj)
[docs]
def truncate(self, *args, type=None, **kwargs):
"""Thin wrapper around :meth:`Source.truncate` that sets default kwargs."""
if type == 'dm':
length = LEXRPC.defs['chat.bsky.convo.defs#messageInput']['properties']['text']['maxGraphemes']
elif type in as1.ACTOR_TYPES:
length = LEXRPC.defs['app.bsky.actor.profile']['record']['properties']['description']['maxGraphemes']
elif type in POST_TYPES:
length = LEXRPC.defs['app.bsky.feed.post']['record']['properties']['text']['maxGraphemes']
else:
assert False, f'unexpected type {type}'
kwargs = {
'include_link': INCLUDE_LINK,
'target_length': length,
'link_length': None,
'ellipsis': ' […]',
**kwargs,
}
return super().truncate(*args, **kwargs)