Source code for granary.facebook

# coding=utf-8
"""Facebook source class. Supports both Graph API and scraping HTML.
import collections
import copy
from datetime import datetime
import html
import logging
import re
import urllib.error, urllib.parse, urllib.request

from bs4.element import NavigableString, Tag
import dateutil.parser
import mf2util
import oauth_dropins.facebook
from oauth_dropins.webutil import util
from oauth_dropins.webutil.util import json_dumps, json_loads

from . import as1
from . import source

logger = logging.getLogger(__name__)

# Since API v2.4, we need to explicitly ask for the fields we want from most API
# endpoints with ?fields=...
#   (see the Declarative Fields section)
API_COMMENTS_FIELDS = 'id,message,from,created_time,message_tags,parent,attachment'
API_COMMENTS_ALL = 'comments?filter=stream&ids=%s&fields=' + API_COMMENTS_FIELDS
# Ideally this fields arg would just be [default fields plus comments], but
# there's no way to ask for that. :/
# asking for too many fields here causes 500s with either "unknown error" or
# "ask for less info" errors.
API_EVENT_FIELDS = 'id,attending,declined,description,end_time,event_times,interested,maybe,noreply,name,owner,picture,place,start_time,timezone,updated_time'
# /user/home requires the read_stream permission, which you probably don't have.
# details in the file docstring.
API_HOME = '%s/home?offset=%d'
API_PHOTOS_UPLOADED = '%s/photos?type=uploaded&fields=id,album,comments,created_time,from,images,likes,link,name,name_tags,object_id,page_story_id,picture,privacy,reactions,shares,updated_time'
API_ALBUMS = '%s/albums?fields=id,count,created_time,from,link,name,privacy,type,updated_time'
API_POST_FIELDS = 'id,application,caption,comments,created_time,description,from,likes,link,message,message_tags,name,object_id,parent_id,picture,place,privacy,reactions,sharedposts,shares,source,status_type,story,to,type,updated_time,with_tags'
API_SELF_POSTS = '%s/feed?offset=%d&fields=' + API_POST_FIELDS
API_SHARES = 'sharedposts?ids=%s'
# by default, me/events only includes events that the user has RSVPed yes,
# maybe, or interested to.
# also note that it includes the rsvp_status field, which isn't in
# API_EVENT_FIELDS because individual event objects don't support it.
API_USER_EVENTS = 'me/events?type=created&fields=rsvp_status,' + API_EVENT_FIELDS
API_USER_EVENTS_DECLINED = 'me/events?type=declined&fields=' + API_EVENT_FIELDS
API_USER_EVENTS_NOT_REPLIED = 'me/events?type=not_replied&fields=' + API_EVENT_FIELDS
API_NEWS_PUBLISHES = '%s/news.publishes?fields=' + API_POST_FIELDS
API_PUBLISH_POST = 'me/feed'
API_PUBLISH_COMMENT = '%s/comments'
API_PUBLISH_LIKE = '%s/likes'
API_PUBLISH_PHOTO = 'me/photos'
# the docs say these can't be published to, but they actually can. ¯\_(ツ)_/¯
# ...except /interested. POSTing to it returns this 400 error. details in
# {
#   "error": {
#     "message": "Unsupported post request. Object with ID '1680863225573216' does not exist, cannot be loaded due to missing permissions, or does not support this operation. Please read the Graph API documentation at",
#     "type": "GraphMethodException",
#     "code": 100
#   }
# }
API_NOTIFICATION = '%s/notifications'

# endpoint for uploading video. note the graph-video subdomain.

MAX_IDS = 50  # for the ids query param

M_HTML_TIMELINE_URL = '%s?v=timeline'
M_HTML_REACTIONS_URL = 'ufi/reaction/profile/browser/?ft_ent_identifier=%s'

# Use a modern browser user agent so that we get modern HTML tags like article
# and footer, which we then use to scrape.
SCRAPE_USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:88.0) Gecko/20100101 Firefox/88.0'

# Maps Facebook Graph API type, status_type, or Open Graph data type to
# ActivityStreams objectType.
  'application': 'application',
  'created_note': 'article',
  'event': 'event',
  'group': 'group',
  'instapp:photo': 'image',
  'link': 'note',
  'location': 'place',
  '': 'audio',
  'note': 'article',  # amusing mismatch between FB and AS/mf2
  'page': 'page',
  'photo': 'image',
  'post': 'note',
  'user': 'person',
  'website': 'article',

# Maps Facebook Graph API post type *and ActivityStreams objectType* to
# ActivityStreams verb.
  'books.reads': 'read',
  'music.listens': 'listen',
  'og.likes': 'like',
  'product': 'give',
  '': 'play',
# The fields in an event object that contain invited and RSVPed users. Slightly
# different from the rsvp_status field values, just to keep us entertained. :/
# Ordered by precedence.
RSVP_FIELDS = ('attending', 'declined', 'maybe', 'interested', 'noreply')
# Maps rsvp_status field to AS verb.
  'attending': 'rsvp-yes',
  'declined': 'rsvp-no',
  'maybe': 'rsvp-maybe',
  'unsure': 'rsvp-maybe',
  'not_replied': 'invite',
  'noreply': 'invite',
  # 'interested' RSVPs actually have rsvp_status='unsure', so this is only used
  # for rsvp_to_object(type='invited').
  'interested': 'rsvp-interested',
# Maps AS verb to API endpoint for publishing RSVP.
  'rsvp-maybe': API_PUBLISH_RSVP_MAYBE,
  'rsvp-interested': None,  # not supported. see API_PUBLISH_RSVP_INTERESTED
  'ANGRY': '😡',
  'CARE': '🤗',
  'HAHA': '😆',
  'LOVE': '❤️',
  'PRIDE': '🏳️‍🌈',
  'SAD': '😢',
  'THANKFUL': '🌼',  #
  'WOW': '😮',
  # nothing for LIKE (it's a like :P) or for NONE

FacebookId = collections.namedtuple('FacebookId', ['user', 'post', 'comment'])

[docs]class Facebook(source.Source): """Facebook source class. See file docstring and Source class for details. Attributes: access_token: string, optional, OAuth access token user_id: string, optional, current user's id (either global or app-scoped) scrape: boolean, whether to scrape's HTML (True) or use the API (False) cookie_c_user: string, optional c_user cookie to use when scraping cookie_xs: string, optional xs cookie to use when scraping """ DOMAIN = '' BASE_URL = '' NAME = 'Facebook' FRONT_PAGE_TEMPLATE = 'templates/facebook_index.html' POST_ID_RE = re.compile('^[0-9_:]+$') # see parse_id() for gory details OPTIMIZED_COMMENTS = True # HTML snippet for embedding a post. # EMBED_POST = """ <div id="fb-root"></div> <script async defer src="//"> </script> <div class="fb-post" data-href="%(url)s"> <div class="fb-xfbml-parse-ignore"><a href="%(url)s">%(content)s</a></div> </div> """
[docs] def __init__(self, access_token=None, user_id=None, scrape=False, cookie_c_user=None, cookie_xs=None): """Constructor. If an OAuth access token is provided, it will be passed on to Facebook. This will be necessary for some people and contact details, based on their privacy settings. If scrape is True, cookie_c_user and cookie_xs must be provided. Args: access_token: string, optional OAuth access token user_id: string, optional, current user's id (either global or app-scoped) scrape: boolean, whether to scrape's HTML (True) or use the API (False) cookie_c_user: string, optional c_user cookie to use when scraping cookie_xs: string, optional xs cookie to use when scraping """ if scrape: assert cookie_c_user and cookie_xs self.access_token = access_token self.user_id = user_id self.scrape = scrape self.cookie_c_user = cookie_c_user self.cookie_xs = cookie_xs
def object_url(self, id): # Facebook always uses www. They redirect bare URLs to it. return f'{id}' user_url = object_url
[docs] def get_actor(self, user_id=None): """Returns a user as a JSON ActivityStreams actor dict. Args: user_id: string id or username. Defaults to 'me', ie the current user. """ if user_id is None: user_id = 'me' return self.user_to_actor(self.urlopen(user_id))
[docs] def get_activities_response(self, user_id=None, group_id=None, app_id=None, activity_id=None, start_index=0, count=0, etag=None, min_id=None, cache=None, fetch_replies=False, fetch_likes=False, fetch_shares=False, fetch_events=False, fetch_mentions=False, search_query=None, fetch_news=False, event_owner_id=None, **kwargs): """Fetches posts and converts them to ActivityStreams activities. See method docstring in for details. Likes, *top-level* replies (ie comments), and reactions are always included. They come from the 'comments', 'likes', and 'reactions' fields in the Graph API's Post object: Threaded comments, ie comments in reply to other top-level comments, require an additional API call, so they're only included if fetch_replies is True. Mentions are never fetched or included because the API doesn't support searching for them. Additional args: fetch_news: boolean, whether to also fetch and include Open Graph news stories (/USER/news.publishes). Requires the permission. Background in event_owner_id: string. if provided, only events owned by this user id will be returned. avoids (but doesn't entirely prevent) processing big non-indieweb events with tons of attendees that put us over app engine's instance memory limit. """ if search_query: raise NotImplementedError() if self.scrape: return self._scrape_m(user_id=user_id, activity_id=activity_id, fetch_replies=fetch_replies, fetch_likes=fetch_likes, **kwargs) activities = [] if activity_id: if not user_id: if '_' not in activity_id: raise ValueError( 'Facebook activity ids must be of the form USERID_POSTID') user_id, activity_id = activity_id.split('_', 1) post = self.urlopen(API_OBJECT % (user_id, activity_id)) if post.get('error'): logger.warning(f"Couldn't fetch object {activity_id}: {post}") posts = [] else: posts = [post] else: url = API_SELF_POSTS if group_id == source.SELF else API_HOME user_id = user_id or 'me' url = url % (user_id, start_index) if count: url = util.add_query_params(url, {'limit': count}) headers = {'If-None-Match': etag} if etag else {} try: resp = self.urlopen(url, headers=headers, _as=None) etag ='ETag') posts = self._as(list, source.load_json(, url)) except urllib.error.HTTPError as e: if e.code == 304: # Not Modified, from a matching ETag posts = [] else: raise if group_id == source.SELF: # TODO: save and use ETag for all of these extra calls # TODO: use batch API to get photos, events, etc in one request # # if fetch_news: posts.extend(self.urlopen(API_NEWS_PUBLISHES % user_id, _as=list)) posts = self._merge_photos(posts, user_id) if fetch_events: activities.extend(self._get_events(owner_id=event_owner_id)) else: # for group feeds, filter out some shared_story posts because they tend # to be very tangential - friends' likes, related posts, etc. # # don't do it for individual people's feeds, e.g. the current user's, # because posts with attached links are also status_type == shared_story posts = [p for p in posts if p.get('status_type') != 'shared_story'] id_to_activity = {} fetch_comments_ids = [] fetch_shares_ids = [] for post in posts: activity = self.post_to_activity(post) activities.append(activity) id = post.get('id') if id: id_to_activity[id] = activity type = post.get('type') status_type = post.get('status_type') if type != 'note' and status_type != 'created_note': fetch_comments_ids.append(id) if type != 'news.publishes': fetch_shares_ids.append(id) # don't fetch extras for Facebook notes. if you pass /comments a note id, it # 400s with "notes API is deprecated for versions ..." # if fetch_shares and fetch_shares_ids: # some sharedposts requests 400, not sure why. # with util.ignore_http_4xx_error(): for id, shares in self._split_id_requests(API_SHARES, fetch_shares_ids).items(): activity = id_to_activity.get(id) if activity: activity['object'].setdefault('tags', []).extend( [self.share_to_object(share) for share in shares]) if fetch_replies and fetch_comments_ids: # some comments requests 400, not sure why. with util.ignore_http_4xx_error(): for id, comments in self._split_id_requests(API_COMMENTS_ALL, fetch_comments_ids).items(): activity = id_to_activity.get(id) if activity: replies = activity['object'].setdefault('replies', {} ).setdefault('items', []) existing_ids = {reply['fb_id'] for reply in replies} for comment in comments: if comment['id'] not in existing_ids: replies.append(self.comment_to_object(comment)) response = self.make_activities_base_response(util.trim_nulls(activities)) response['etag'] = etag return response
def _merge_photos(self, posts, user_id): """Fetches and merges photo objects into posts, replacing matching posts. Have to fetch uploaded photos manually since facebook sometimes collapses multiple photos into consolidated posts. Also, photo objects don't have the privacy field, so we get that from the corresponding post or album, if possible. Populates a custom 'object_for_ids' field in the FB photo objects. This is later copied into a custom 'fb_object_for_ids' field in their corresponding AS objects. Args: posts: list of Facebook post object dicts user_id: string Facebook user id Returns: new list of post and photo object dicts """ assert user_id posts_by_obj_id = {} for post in posts: obj_id = post.get('object_id') if obj_id: existing = posts_by_obj_id.get(obj_id) if existing: logger.warning(f"merging posts for object_id {obj_id}: overwriting {existing.get('id')} with {post.get('id')}!") posts_by_obj_id[obj_id] = post albums = None # lazy loaded, maps facebook id to ActivityStreams object photos = self.urlopen(API_PHOTOS_UPLOADED % user_id, _as=list) for photo in photos: album_id = photo.get('album', {}).get('id') post = posts_by_obj_id.pop(photo.get('id'), {}) if post.get('id'): photo.setdefault('object_for_ids', []).append(post['id']) privacy = post.get('privacy') if privacy and privacy.get('value') != 'CUSTOM': photo['privacy'] = privacy elif album_id: if albums is None: albums = {a['id']: a for a in self.urlopen(API_ALBUMS % user_id, _as=list)} photo['privacy'] = albums.get(album_id, {}).get('privacy') else: photo['privacy'] = 'custom' # ie unknown return ([p for p in posts if not p.get('object_id')] + list(posts_by_obj_id.values()) + photos) def _split_id_requests(self, api_call, ids): """Splits an API call into multiple to stay under the MAX_IDS limit per call. Args: api_call: string with %s placeholder for ids query param ids: sequence of string ids Returns: merged list of objects from the responses' 'data' fields """ results = {} for i in range(0, len(ids), MAX_IDS): resp = self.urlopen(api_call % ','.join(ids[i:i + MAX_IDS])) for id, objs in resp.items(): # objs is usually a dict but sometimes a boolean. (oh FB, never change!) results.setdefault(id, []).extend(self._as(dict, objs).get('data', [])) return results def _get_events(self, owner_id=None): """Fetches the current user's events. TODO: also fetch and use API_USER_EVENTS_DECLINED, API_USER_EVENTS_NOT_REPLIED Args: owner_id: string. if provided, only returns events owned by this user Returns: list of ActivityStreams event objects """ events = self.urlopen(API_USER_EVENTS, _as=list) return [self.event_to_activity(event) for event in events if not owner_id or owner_id == event.get('owner', {}).get('id')]
[docs] def get_event(self, event_id, owner_id=None): """Returns a Facebook event post. Args: id: string, site-specific event id owner_id: string Returns: dict, decoded ActivityStreams activity, or None if the event is not found or is owned by a different user than owner_id (if provided) """ event = None with util.ignore_http_4xx_error(): event = self.urlopen(API_EVENT % event_id) if not event or event.get('error'): logger.warning(f"Couldn't fetch event {event_id}: {event}") return None event_owner_id = event.get('owner', {}).get('id') if owner_id and event_owner_id != owner_id: label = event.get('name') or event.get('id')'Ignoring event {label} owned by user id {event_owner_id} instead of {owner_id}') return None return self.event_to_activity(event)
[docs] def get_comment(self, comment_id, activity_id=None, activity_author_id=None, activity=None): """Returns an ActivityStreams comment object. Args: comment_id: string comment id activity_id: string activity id, optional activity_author_id: string activity author id, optional activity: activity object (optional) """ try: resp = self.urlopen(API_COMMENT % comment_id) except urllib.error.HTTPError as e: if e.code == 400 and '_' in comment_id: # Facebook may want us to ask for this without the other prefixed id(s) resp = self.urlopen(API_COMMENT % comment_id.split('_')[-1]) else: raise return self.comment_to_object(resp, post_author_id=activity_author_id)
[docs] def get_share(self, activity_user_id, activity_id, share_id, activity=None): """Returns an ActivityStreams share activity object. Args: activity_user_id: string id of the user who posted the original activity activity_id: string activity id share_id: string id of the share object activity: activity object (optional) """ orig_id = f'{activity_user_id}_{activity_id}' # shares sometimes 400, not sure why. # shares = {} with util.ignore_http_4xx_error(): shares = self.urlopen(API_SHARES % orig_id, _as=dict) shares = shares.get(orig_id, {}).get('data', []) if not shares: return for share in shares: id = share.get('id') if not id: continue user_id, obj_id = id.split('_', 1) # strip user id prefix if share_id == id == share_id or share_id == obj_id: with util.ignore_http_4xx_error(): return self.share_to_object(self.urlopen(API_OBJECT % (user_id, obj_id)))
[docs] def get_albums(self, user_id=None): """Fetches and returns a user's photo albums. Args: user_id: string id or username. Defaults to 'me', ie the current user. Returns: sequence of ActivityStream album object dicts """ url = API_ALBUMS % (user_id or 'me') return [self.album_to_object(a) for a in self.urlopen(url, _as=list)]
[docs] def get_reaction(self, activity_user_id, activity_id, reaction_user_id, reaction_id, activity=None): """Fetches and returns a reaction. Args: activity_user_id: string id of the user who posted the original activity activity_id: string activity id reaction_user_id: string id of the user who reacted reaction_id: string id of the reaction. one of: 'love', 'wow', 'haha', 'sad', 'angry', 'thankful', 'pride', 'care activity: activity object (optional) """ if '_' not in reaction_id: # handle just name of reaction type reaction_id = f'{activity_id}_{reaction_id}_by_{reaction_user_id}' return super(Facebook, self).get_reaction( activity_user_id, activity_id, reaction_user_id, reaction_id, activity=activity)
[docs] def create(self, obj, include_link=source.OMIT_LINK, ignore_formatting=False): """Creates a new post, comment, like, or RSVP. Args: obj: ActivityStreams object include_link: string ignore_formatting: boolean Returns: a CreationResult whose contents will be a dict with 'id' and 'url' keys for the newly created Facebook object (or None) """ return self._create(obj, preview=False, include_link=include_link, ignore_formatting=ignore_formatting)
[docs] def preview_create(self, obj, include_link=source.OMIT_LINK, ignore_formatting=False): """Previews creating a new post, comment, like, or RSVP. Args: obj: ActivityStreams object include_link: string ignore_formatting: boolean Returns: a CreationResult whose contents will be a unicode string HTML snippet or None """ return self._create(obj, preview=True, include_link=include_link, ignore_formatting=ignore_formatting)
def _create(self, obj, preview=None, include_link=source.OMIT_LINK, ignore_formatting=False): """Creates a new post, comment, like, or RSVP. Args: obj: ActivityStreams object preview: boolean include_link: string ignore_formatting: boolean Returns: a CreationResult If preview is True, the contents will be a unicode string HTML snippet. If False, it will be a dict with 'id' and 'url' keys for the newly created Facebook object. """ # TODO: validation, error handling assert preview in (False, True) type = obj.get('objectType') verb = obj.get('verb') base_obj = self.base_object(obj, verb=verb) base_id = base_obj.get('id') base_type = base_obj.get('objectType') base_url = base_obj.get('url') if base_id and not base_url: base_url = base_obj['url'] = self.object_url(base_id) video_url = util.get_first(obj, 'stream', {}).get('url') image_url = util.get_first(obj, 'image', {}).get('url') content = self._content_for_create(obj, ignore_formatting=ignore_formatting, strip_first_video_tag=bool(video_url)) if not content and not video_url and not image_url: if type == 'activity': content = verb else: return source.creation_result( abort=False, # keep looking for things to post error_plain='No content text found.', error_html='No content text found.') name = obj.get('displayName') if name and mf2util.is_name_a_title(name, content): content = name + u"\n\n" + content people = self._get_person_tags(obj) url = obj.get('url') if include_link == source.INCLUDE_LINK and url: content += f'\n\n(Originally published at: {url})' preview_content = util.linkify(content) if video_url: preview_content += f'<br /><br /><video controls src="{video_url}"><a href="{video_url}">this video</a></video>' elif image_url: preview_content += f'<br /><br /><img src="{image_url}" />' if people: preview_content += '<br /><br /><em>with %s</em>' % ', '.join( '<a href="%s">%s</a>' % ( tag.get('url'), tag.get('displayName') or 'User %s' % tag['id']) for tag in people) msg_data = collections.OrderedDict({'message': content.encode('utf-8')}) if type == 'comment': if not base_url: return source.creation_result( abort=True, error_plain='Could not find a Facebook status to reply to.', error_html='Could not find a Facebook status to <a href="">reply to</a>. ' 'Check that your post has an <a href="">in-reply-to</a> ' 'link a Facebook URL or to an original post that publishes a ' '<a href="">rel-syndication</a> link to Facebook.') if preview: desc = f"""<span class="verb">comment</span> on <a href="{base_url}">this post</a>: <br /><br />{self.embed_post(base_obj)}<br />""" return source.creation_result(content=preview_content, description=desc) else: if image_url: msg_data['attachment_url'] = image_url resp = self.urlopen(API_PUBLISH_COMMENT % base_id, data=urllib.parse.urlencode(msg_data)) url = self.comment_url(base_id, resp['id'], post_author_id=base_obj.get('author', {}).get('id')) resp.update({'url': url, 'type': 'comment'}) elif type == 'activity' and verb == 'like': if not base_url: return source.creation_result( abort=True, error_plain='Could not find a Facebook status to like.', error_html='Could not find a Facebook status to <a href="">like</a>. ' 'Check that your post has an <a href="">like-of</a> ' 'link a Facebook URL or to an original post that publishes a ' '<a href="">rel-syndication</a> link to Facebook.') elif base_type in ('person', 'page'): return source.creation_result( abort=True, error_plain="Sorry, the Facebook API doesn't support liking pages.", error_html='Sorry, <a href="">' "the Facebook API doesn't support liking pages</a>.") if preview: desc = '<span class="verb">like</span> ' if base_type == 'comment': comment = self.comment_to_object(self.urlopen(base_id)) author = comment.get('author', '') if author: author = self.embed_actor(author) + ':\n' desc += f"<a href=\"{base_url}\">this comment</a>:\n<br /><br />{author}{comment.get('content')}<br />" else: desc += f'<a href="{base_url}">this post</a>:\n<br /><br />{self.embed_post(base_obj)}<br />' return source.creation_result(description=desc) else: resp = self.urlopen(API_PUBLISH_LIKE % base_id, data='') assert resp.get('success'), resp resp = {'type': 'like'} elif type == 'activity' and verb in RSVP_PUBLISH_ENDPOINTS: if not base_url: return source.creation_result( abort=True, error_plain="This looks like an RSVP, but it's missing an " "in-reply-to link to the Facebook event.", error_html="This looks like an <a href=''>RSVP</a>, " "but it's missing an <a href=''>in-reply-to</a> " "link to the Facebook event.") elif verb == 'rsvp-interested': # API doesn't support creating "interested" RSVPs. # msg = 'Sorry, the Facebook API doesn\'t support creating "interested" RSVPs. Try a "maybe" RSVP instead!' return source.creation_result(abort=True, error_plain=msg, error_html=msg) # can't RSVP to multi-instance aka recurring events # event = self.urlopen(API_EVENT % base_id) if event.get('event_times'): return source.creation_result( abort=True, error_plain="That's a recurring event. Please RSVP to a specific instance!", error_html='<a href="%s">That\'s a recurring event.</a> Please RSVP to a specific instance!' % base_url) # TODO: event invites if preview: assert verb.startswith('rsvp-') desc = f'<span class="verb">RSVP {verb[5:]}</span> to <a href="{base_url}">this event</a>.' return source.creation_result(description=desc) else: resp = self.urlopen(RSVP_PUBLISH_ENDPOINTS[verb] % base_id, data='') assert resp.get('success'), resp resp = {'type': 'rsvp'} elif type in ('note', 'article'): if preview: return source.creation_result(content=preview_content, description='<span class="verb">post</span>:') if video_url: api_call = API_UPLOAD_VIDEO msg_data.update({ 'file_url': video_url, 'description': msg_data.pop('message', ''), }) elif image_url: api_call = API_PUBLISH_PHOTO msg_data['url'] = image_url # use Timeline Photos album, if we can find it, since it keeps photo # posts separate instead of consolidating them into a single "X added # n new photos..." post. # for album in self.urlopen(API_ALBUMS % 'me', _as=list): id = album.get('id') if id and album.get('type') == 'wall': api_call = API_PUBLISH_ALBUM_PHOTO % id break if people: # tags is JSON list of dicts with tag_uid fields # msg_data['tags'] = json_dumps([{'tag_uid': tag['id']} for tag in people]) else: api_call = API_PUBLISH_POST if people: # tags is comma-separated user id string # msg_data['tags'] = ','.join(tag['id'] for tag in people) resp = self.urlopen(api_call, data=urllib.parse.urlencode(msg_data)) resp.update({'url': self.post_url(resp), 'type': 'post'}) if video_url and not resp.get('success', True): msg = 'Video upload failed.' return source.creation_result(abort=True, error_plain=msg, error_html=msg) elif type == 'activity' and verb == 'share': return source.creation_result( abort=True, error_plain='Cannot publish shares on Facebook.', error_html='Cannot publish <a href="">shares</a> ' 'on Facebook. This limitation is imposed by the ' '<a href="">Facebook Graph API</a>.') else: return source.creation_result( abort=False, error_plain=f'Cannot publish type={type}, verb={verb} to Facebook', error_html=f'Cannot publish type={type}, verb={verb} to Facebook') if 'url' not in resp: resp['url'] = base_url return source.creation_result(resp) def _get_person_tags(self, obj): """Extracts and prepares person tags for Facebook users. Args: obj: ActivityStreams object Returns: sequence of ActivityStreams tag objects with url, id, and optional displayName fields. The id field is a raw Facebook user id. """ people = {} # maps id to tag for tag in obj.get('tags', []): url = tag.get('url', '') id = url.split('/')[-1] if (util.domain_from_link(url) == self.DOMAIN and util.is_int(id) and tag.get('objectType') == 'person' and not tag.get('startIndex')): # mentions are linkified separately tag = copy.copy(tag) tag['id'] = id people[id] = tag return sorted(people.values(), key=lambda t: t['id'])
[docs] def create_notification(self, user_id, text, link): """Sends the authenticated user a notification. Uses the Notifications API (beta): Args: user_id: string, username or user ID text: string, shown to the user in the notification link: relative string URL, the user is redirected here when they click on the notification. Note that only the path and query parameters are used! they're combined with the domain in your Facebook app's Game App URL: Raises: urllib2.HTPPError """ logger.debug(f'Sending Facebook notification: {text!r}, {link}') params = { 'template': text, 'href': link, # this is a synthetic app access token. # 'access_token': f'{oauth_dropins.facebook.FACEBOOK_APP_ID}|{oauth_dropins.facebook.FACEBOOK_APP_SECRET}', } url = API_BASE + API_NOTIFICATION % user_id resp = util.urlopen(urllib.request.Request(url, data=urllib.parse.urlencode(params))) logger.debug(f'Response: {resp.getcode()} {}')
[docs] def post_url(self, post): """Returns a short Facebook URL for a post. Args: post: Facebook JSON post """ fb_id = post.get('id') if not fb_id: return None id = self.parse_id(fb_id) author_id = id.user or post.get('from', {}).get('id') if author_id and return f'{author_id}/posts/{}' return self.object_url(fb_id)
[docs] def comment_url(self, post_id, comment_id, post_author_id=None): """Returns a short Facebook URL for a comment. Args: post_id: Facebook post id comment_id: Facebook comment id """ if post_author_id: post_id = post_author_id + '/posts/' + post_id return f'{post_id}?comment_id={comment_id}'
[docs] @classmethod def base_id(cls, url): """Guesses the id of the object in the given URL. Returns: string, or None """ params = urllib.parse.parse_qs(urllib.parse.urlparse(url).query) event_id = params.get('event_time_id') if event_id: return event_id[0] return super(Facebook, cls).base_id(url)
[docs] def base_object(self, obj, verb=None, resolve_numeric_id=False): """Returns the 'base' silo object that an object operates on. This is mostly a big bag of heuristics for reverse engineering and parsing Facebook URLs. Whee. Args: obj: ActivityStreams object verb: string, optional resolve_numeric_id: if True, tries harder to populate the numeric_id field by making an additional API call to look up the object if necessary. Returns: dict, minimal ActivityStreams object. Usually has at least id, numeric_id, and url fields; may also have author. """ base_obj = super(Facebook, self).base_object(obj) url = base_obj.get('url') if not url: return base_obj author = base_obj.setdefault('author', {}) base_id = base_obj.get('id') if base_id and not base_obj.get('numeric_id'): if util.is_int(base_id): base_obj['numeric_id'] = base_id elif resolve_numeric_id: base_obj = self.user_to_actor(self.urlopen(base_id)) try: parsed = urllib.parse.urlparse(url) params = urllib.parse.parse_qs(parsed.query) assert parsed.path.startswith('/') path = parsed.path.strip('/') path_parts = path.split('/') if path == 'photo.php': # photo URLs look like: # # fbids = params.get('fbid') base_id = base_obj['id'] = fbids[0] if fbids else None elif len(path_parts) == 1: # maybe a profile/page URL? if not base_obj.get('objectType'): base_obj['objectType'] = 'person' # or page if not base_id: base_id = base_obj['id'] = path_parts[0] # this is a gross hack - adding the FB username field to an AS object # and then re-running user_to_actor - but it's an easy/reusable way to # populate image, displayName, etc. if not base_obj.get('username') and not util.is_int(base_id): base_obj['username'] = base_id base_obj.update({k: v for k, v in self.user_to_actor(base_obj).items() if k not in base_obj}) elif len(path_parts) >= 3 and path_parts[1] == 'posts': author_id = path_parts[0] if not author.get('id'): author['id'] = author_id if util.is_int(author_id) and not author.get('numeric_id'): author['numeric_id'] = author_id # photo album URLs look like this: # # c.f. elif path == 'media/set': set_id = params.get('set') if set_id and set_id[0].startswith('a.'): base_obj['id'] = set_id[0].split('.')[1] # single instances of recurring (aka multi-instance) event URLs look like this: # event_id = params.get('event_time_id') if event_id: event_id = event_id[0] base_obj.update({ 'id': event_id, 'numeric_id': event_id, 'url': self.object_url(event_id) }) comment_id = params.get('comment_id') or params.get('reply_comment_id') if comment_id: base_obj['id'] += '_' + comment_id[0] base_obj['objectType'] = 'comment' if (base_id and '_' not in base_id and author.get('numeric_id') and not event_id): # add author user id prefix. base_obj['id'] = f"{author['numeric_id']}_{base_id}" except BaseException as e: logger.warning( "Couldn't parse object URL %s : %s. Falling back to default logic.", url, e, exc_info=True) return base_obj
[docs] def post_to_activity(self, post): """Converts a post to an activity. Args: post: dict, a decoded JSON post Returns: an ActivityStreams activity dict, ready to be JSON-encoded """ obj = self.post_to_object(post, type='post') if not obj: return {} activity = { 'verb': VERBS.get(post.get('type', obj.get('objectType')), 'post'), 'published': obj.get('published'), 'updated': obj.get('updated'), 'fb_id': post.get('id'), 'url': self.post_url(post), 'actor': obj.get('author'), 'object': obj, } post_id = self.parse_id(activity['fb_id']).post if post_id: activity['id'] = self.tag_uri(post_id) application = post.get('application') if application: activity['generator'] = { 'displayName': application.get('name'), 'id': self.tag_uri(application.get('id')), } return self.postprocess_activity(activity)
[docs] def post_to_object(self, post, type=None): """Converts a post to an object. TODO: handle the sharedposts field Args: post: dict, a decoded JSON post type: string object type: None, 'post', or 'comment' Returns: an ActivityStreams object dict, ready to be JSON-encoded """ assert type in (None, 'post', 'comment') fb_id = post.get('id') post_type = post.get('type') status_type = post.get('status_type') url = self.post_url(post) display_name = None message = (post.get('message') or post.get('story') or post.get('description') or post.get('name')) picture = post.get('picture') if isinstance(picture, dict): picture = picture.get('data', {}).get('url') # if the user posted this picture, try to get a larger size. if (picture and picture.endswith('_s.jpg') and (post_type == 'photo' or status_type == 'added_photos')): picture = picture[:-6] + '_o.jpg' data = post.get('data', {}) for field in ('object', 'song'): obj = data.get(field) if obj: fb_id = obj.get('id') post_type = obj.get('type') url = obj.get('url') display_name = obj.get('title') object_type = OBJECT_TYPES.get(status_type) or OBJECT_TYPES.get(post_type) author = self.user_to_actor(post.get('from')) link = post.get('link', '') gift = link.startswith('/gifts/') if link.startswith('/'): link = urllib.parse.urljoin(self.BASE_URL, link) if gift: object_type = 'product' if not object_type: if picture and not message: object_type = 'image' else: object_type = 'note' id = self.parse_id(fb_id, is_comment=(type == 'comment')) if type == 'comment' and not id.comment: return {} elif type != 'comment' and not return {} obj = { 'id': self.tag_uri(, 'fb_id': fb_id, 'objectType': object_type, 'published': util.maybe_iso8601_to_rfc3339(post.get('created_time')), 'updated': util.maybe_iso8601_to_rfc3339(post.get('updated_time')), 'author': author, # FB post ids are of the form USERID_POSTID 'url': url, 'image': {'url': picture}, 'displayName': display_name, 'fb_object_id': post.get('object_id'), 'fb_object_for_ids': post.get('object_for_ids'), 'to': self.privacy_to_to(post, type=type), } # message_tags is a dict in most post types, but a list in some other object # types, e.g. comments. message_tags = post.get('message_tags', []) if isinstance(message_tags, dict): message_tags = sum(message_tags.values(), []) # flatten elif not isinstance(message_tags, list): message_tags = list(message_tags) # fingers crossed! :P # tags and likes tags = (self._as(list, post.get('to', {})) + self._as(list, post.get('with_tags', {})) + message_tags) obj['tags'] = [self.postprocess_object({ 'objectType': OBJECT_TYPES.get(t.get('type'), 'person'), 'id': self.tag_uri(t.get('id')), 'url': self.object_url(t.get('id')), 'displayName': t.get('name'), 'startIndex': t.get('offset'), 'length': t.get('length'), }) for t in tags] obj['tags'] += [self.postprocess_object({ 'id': f"{obj['id']}_liked_by_{like.get('id')}", 'url': url + f"#liked-by-{like.get('id')}", 'objectType': 'activity', 'verb': 'like', 'object': {'url': url}, 'author': self.user_to_actor(like), }) for like in self._as(list, post.get('likes', {}))] for reaction in self._as(list, post.get('reactions', {})): id = reaction.get('id') type = reaction.get('type', '') content = REACTION_CONTENT.get(type) if content: type = type.lower() obj['tags'].append(self.postprocess_object({ 'id': f"{obj['id']}_{type}_by_{id}", 'url': url + f'#{type}-by-{id}', 'objectType': 'activity', 'verb': 'react', 'content': content, 'object': {'url': url}, 'author': self.user_to_actor(reaction), })) # Escape HTML characters: <, >, &. Have to do it manually, instead of # reusing e.g. cgi.escape, so that we can shuffle over each tag startIndex # appropriately. :( if message: content = util.WideUnicode(copy.copy(message)) tags = sorted([t for t in obj['tags'] if t.get('startIndex')], key=lambda t: t['startIndex']) entities = {'<': '&lt;', '>': '&gt;', '&': '&amp;'} i = 0 while i < len(content): if tags and tags[0]['startIndex'] == i: tags.pop(0) entity = entities.get(content[i]) if entity: content = util.WideUnicode(content[:i] + entity + content[i + 1:]) for tag in tags: tag['startIndex'] += len(entity) - 1 i += 1 assert not tags obj['content'] = content # is there an attachment? prefer to represent it as a picture (ie image # object), but if not, fall back to a link. att = { 'url': link or url, 'image': {'url': picture}, 'displayName': post.get('name'), 'summary': post.get('caption'), 'content': post.get('description'), } if post_type == 'photo' or status_type == 'added_photos': att['objectType'] = 'image' obj['attachments'] = [att] elif link and not gift: att['objectType'] = 'article' obj['attachments'] = [att] # location place = post.get('place') if place: place_id = place.get('id') obj['location'] = { 'displayName': place.get('name'), 'id': self.tag_uri(place_id), 'url': self.object_url(place_id), } location = place.get('location', None) if isinstance(location, dict): lat = location.get('latitude') lon = location.get('longitude') if lat and lon: obj['location'].update({'latitude': lat, 'longitude': lon}) elif 'location' in post: obj['location'] = {'displayName': post['location']} # comments go in the replies field, according to the "Responses for # Activity Streams" extension spec: # comments = post.get('comments', {}).get('data') if comments: items = util.trim_nulls([self.comment_to_object(c, post_id=post['id']) for c in comments]) obj['replies'] = { 'items': items, 'totalItems': len(items), } return self.postprocess_object(obj)
[docs] def comment_to_object(self, comment, post_id=None, post_author_id=None): """Converts a comment to an object. Args: comment: dict, a decoded JSON comment post_id: optional string Facebook post id. Only used if the comment id doesn't have an embedded post id. post_author_id: optional string Facebook post author id. Only used if the comment id doesn't have an embedded post author id. Returns: an ActivityStreams object dict, ready to be JSON-encoded """ obj = self.post_to_object(comment, type='comment') if not obj: return obj obj['objectType'] = 'comment' fb_id = comment.get('id') obj['fb_id'] = fb_id id = self.parse_id(fb_id, is_comment=True) if not id.comment: return None post_id = or post_id post_author_id = id.user or post_author_id if post_id: obj.update({ 'id': self._comment_id(post_id, id.comment), 'url': self.comment_url(post_id, id.comment, post_author_id=post_author_id), 'inReplyTo': [{ 'id': self.tag_uri(post_id), 'url': self.post_url({'id': post_id, 'from': {'id': post_author_id}}), }], }) parent_id = comment.get('parent', {}).get('id') if parent_id: obj['inReplyTo'].append({ 'id': self.tag_uri(parent_id), 'url': self.comment_url(post_id, parent_id.split('_')[-1], # strip POSTID_ prefix post_author_id=post_author_id) }) att = comment.get('attachment') if (att and att.get('type') in ('photo', 'animated_image_autoplay', 'animated_image_share') and not obj.get('image')): obj['image'] = {'url': att.get('media', {}).get('image', {}).get('src')} obj.setdefault('attachments', []).append({ 'objectType': 'image', 'image': obj['image'], 'url': att.get('url'), }) return self.postprocess_object(obj)
def _comment_id(self, post_id, comment_id): return self.tag_uri(f'{post_id}_{comment_id}')
[docs] def share_to_object(self, share): """Converts a share (from /OBJECT/sharedposts) to an object. Args: share: dict, a decoded JSON share Returns: an ActivityStreams object dict, ready to be JSON-encoded """ obj = self.post_to_object(share) if not obj: return obj att = obj.get('attachments', []) obj.update({ 'objectType': 'activity', 'verb': 'share', 'object': att.pop(0) if att else {'url': share.get('link')}, }) content = obj.get('content') if content: obj['displayName'] = content return self.postprocess_object(obj)
[docs] def user_to_actor(self, user): """Converts a user or page to an actor. Args: user: dict, a decoded JSON Facebook user or page Returns: an ActivityStreams actor dict, ready to be JSON-encoded """ if not user: return {} id = user.get('id') username = user.get('username') handle = username or id if not handle: return {} # extract web site links. extract_links uniquifies and preserves order urls = (util.extract_links(user.get('link')) or [self.user_url(handle)]) + sum( (util.extract_links(user.get(field)) for field in ('website', 'about', 'description')), []) actor = { # FB only returns the type field if you fetch the object with ?metadata=1 # 'objectType': 'page' if user.get('type') == 'page' else 'person', 'displayName': user.get('name') or username, 'id': self.tag_uri(handle), 'updated': util.maybe_iso8601_to_rfc3339(user.get('updated_time')), 'username': username, 'description': user.get('description') or user.get('about'), 'summary': user.get('about'), 'url': urls[0], 'urls': [{'value': u} for u in urls] if len(urls) > 1 else None, } # numeric_id is our own custom field that always has the source's numeric # user id, if available. if util.is_int(id): actor.update({ 'numeric_id': id, 'image': { 'url': f'{API_BASE}{id}/picture?type=large', }, }) location = user.get('location') if location: actor['location'] = {'id': location.get('id'), 'displayName': location.get('name')} return util.trim_nulls(actor)
[docs] def event_to_object(self, event, rsvps=None): """Converts an event to an object. Args: event: dict, a decoded JSON Facebook event rsvps: sequence, optional Facebook RSVPs Returns: an ActivityStreams object dict """ obj = self.post_to_object(event) obj.update({ 'displayName': event.get('name'), 'objectType': 'event', 'author': self.user_to_actor(event.get('owner')), 'startTime': event.get('start_time'), 'endTime': event.get('end_time'), }) if rsvps: as1.add_rsvps_to_event( obj, [self.rsvp_to_object(r, event=event) for r in rsvps]) # de-dupe the event's RSVPs by (user) id. RSVP_FIELDS is ordered by # precedence, so iterate in reverse order so higher precedence fields # override. id_to_rsvp = {} for field in reversed(RSVP_FIELDS): for rsvp in event.get(field, {}).get('data', []): rsvp = self.rsvp_to_object(rsvp, type=field, event=event) id_to_rsvp[rsvp['id']] = rsvp as1.add_rsvps_to_event(obj, id_to_rsvp.values()) return self.postprocess_object(obj)
[docs] def event_to_activity(self, event, rsvps=None): """Converts a event to an activity. Args: event: dict, a decoded JSON Facebook event rsvps: list of JSON Facebook RSVPs Returns: an ActivityStreams activity dict """ obj = self.event_to_object(event, rsvps=rsvps) return {'object': obj, 'id': obj.get('id'), 'url': obj.get('url'), }
[docs] def rsvp_to_object(self, rsvp, type=None, event=None): """Converts an RSVP to an object. The 'id' field will ony be filled in if event['id'] is provided. Args: rsvp: dict, a decoded JSON Facebook RSVP type: optional Facebook RSVP type, one of RSVP_FIELDS event: Facebook event object. May contain only a single 'id' element. Returns: an ActivityStreams object dict """ verb = RSVP_VERBS.get(type or rsvp.get('rsvp_status')) obj = { 'objectType': 'activity', 'verb': verb, } if verb == 'invite': invitee = self.user_to_actor(rsvp) invitee['objectType'] = 'person' obj.update({ 'object': invitee, 'actor': self.user_to_actor(event.get('owner')) if event else None, }) else: obj['actor'] = self.user_to_actor(rsvp) if event: user_id = rsvp.get('id') event_id = event.get('id') if event_id and user_id: obj['id'] = self.tag_uri(f'{event_id}_rsvp_{user_id}') obj['url'] = f'{self.object_url(event_id)}#{user_id}' return self.postprocess_object(obj)
[docs] def album_to_object(self, album): """Converts a photo album to an object. Args: album: dict, a decoded JSON Facebook album Returns: an ActivityStreams object dict """ if not album: return {} id = album.get('id') return self.postprocess_object({ 'id': self.tag_uri(id), 'fb_id': id, 'url': album.get('link'), 'objectType': 'collection', 'author': self.user_to_actor(album.get('from')), 'displayName': album.get('name'), 'totalItems': album.get('count'), 'to': self.privacy_to_to(album), 'published': util.maybe_iso8601_to_rfc3339(album.get('created_time')), 'updated': util.maybe_iso8601_to_rfc3339(album.get('updated_time')), })
[docs] def privacy_to_to(self, obj, type=None): """Converts a Facebook `privacy` field to an ActivityStreams `to` field. privacy is sometimes an object: ...and other times a string: Args: obj: dict, Facebook object (post, album, comment, etc) Returns: dict: ActivityStreams `to` object, or None if unknown type: string object type: None, 'post', or 'comment' """ privacy = obj.get('privacy') if isinstance(privacy, dict): privacy = privacy.get('value') from_id = obj.get('from', {}).get('id') if (type == 'post' and not privacy and (from_id and self.user_id and from_id != self.user_id)): # privacy value '' means it doesn't have an explicit audience set, so it # inherits the defaults privacy setting for wherever it was posted: a # group, a page, a user's timeline, etc. unfortunately we haven't found a # way to get that default setting via the API. so, approximate that # by checking whether the current user posted it or someone else. # # return [{'objectType': 'unknown'}] elif privacy and privacy.lower() == 'custom': return [{'objectType': 'unknown'}] elif privacy is not None: public = privacy.lower() in ('', 'everyone', 'open') return [{'objectType': 'group', 'alias': '@public' if public else '@private'}]
[docs] def fql_stream_to_post(self, stream, actor=None): """Converts an FQL stream row to a Graph API post. Currently unused and untested! Use at your own risk. TODO: place, to, with_tags, message_tags, likes, comments, etc., most require extra queries to inflate. Args: stream: dict, a row from the FQL stream table actor: dict, a row from the FQL profile table Returns: dict, Graph API post Here's example code to query FQL and pass the results to this method:: resp = self.urlopen('' + urllib.urlencode( {'q': json_dumps({ 'stream': '''\\ SELECT actor_id, post_id, created_time, updated_time, attachment, privacy, message, description FROM stream WHERE filter_key IN ( SELECT filter_key FROM stream_filter WHERE uid = me()) ORDER BY created_time DESC LIMIT 50 ''', 'actors': '''\\ SELECT id, name, username, url, pic FROM profile WHERE id IN (SELECT actor_id FROM #stream) '''})})) results = {q['name']: q['fql_result_set'] for q in resp['data']} actors = {a['id']: a for a in results['actors']} posts = [self.fql_stream_to_post(row, actor=actors[row['actor_id']]) for row in results['stream']] """ post = copy.deepcopy(stream) post.update({ 'id': stream.pop('post_id', None), 'type': stream.pop('fb_object_type', None), 'object_id': stream.pop('fb_object_id', None), 'from': actor or {'id': stream.pop('actor_id', None)}, # message, description, name, created_time, updated_time are left in place }) # attachments att = stream.pop('attachment', {}) for media in att.get('media') or [att]: type = media.get('type') obj = { 'type': type, 'url': media.get('href'), 'title': att.get('name') or att.get('caption') or att.get('description'), 'data': {'url': media.get('src')}, } # last element of each type wins if type == 'photo': post['image'] = obj elif type == 'link': post['link'] = obj['url'] return util.trim_nulls(post)
[docs] def email_to_object(self, html): """Converts a Facebook HTML notification email to an AS1 object. Returns: dict, AS1 object, or None if email html couldn't be parsed Arguments: html: string """ soup = util.parse_html(html) type = None type = 'comment' descs = self._find_all_text(soup, r'commented on( your)?') if not descs: type = 'like' descs = self._find_all_text(soup, r'likes your') if not descs: return None links = descs[-1].find_all('a') name_link = links[0] name = name_link.get_text(strip=True) profile_url = name_link['href'] resp_url = self._sanitize_url(links[1]['href']) post_url, comment_id = util.remove_query_param(resp_url, 'comment_id') if type == 'comment': # comment emails have a second section with a preview rendering of the # comment, picture and date and comment text are there. name_link = soup.find_all('a', string=re.compile(name))[1] picture = name_link.find_previous('img')['src'] when = name_link.find_next('td') comment = when.find_next('span', class_=re.compile(r'mb_text')) if not comment: return None obj = { 'author': { 'objectType': 'person', 'displayName': name, 'image': {'url': picture}, 'url': self._sanitize_url(profile_url), }, # TODO 'to': [{'objectType': 'group', 'alias': '@public'}], } obj['published'] = self._scraped_datetime(when) # extract Facebook post ID from URL url_parts = urllib.parse.urlparse(resp_url) path = url_parts.path.strip('/').split('/') url_params = urllib.parse.parse_qs(url_parts.query) if len(path) == 3 and path[1] == 'posts': post_id = path[2] else: post_id = (util.get_first(url_params, 'story_fbid') or util.get_first(url_params, 'fbid') or '') if type == 'comment': obj.update({ # TODO: check that this works on urls to different types of posts, eg photos 'objectType': 'comment', 'id': self._comment_id(post_id, comment_id), 'url': resp_url, 'content': comment.get_text(strip=True), 'inReplyTo': [{'url': post_url}], }) elif type == 'like': liker_id = self.base_id(obj['author']['url']) obj.update({ 'objectType': 'activity', 'verb': 'like', # TODO: handle author URLs for users without usernames 'id': self.tag_uri(f'{post_id}_liked_by_{liker_id}'), 'url': post_url + f'#liked-by-{liker_id}', 'object': {'url': post_url}, }) return util.trim_nulls(obj)
@staticmethod def _find_all_text(soup, regexp): """BeautifulSoup utility that searches for text and returns a Tag. I'd rather just use soup.find(string=...), but it returns a NavigableString instead of a Tag, and I need a Tag so I can look at the elements inside it. Args: soup: BeautifulSoup regexp: string, must match target's text after stripping whitespace """ regexp = re.compile(regexp) return soup.find_all(lambda tag: any(regexp.match(c.string.strip()) for c in tag.contents if c.string)) @classmethod def _sanitize_url(cls, url): """Normalizes a URL from a notification email. Specifically, removes the parts that only let the receiving user use it, and removes some personally identifying parts. Example profile:;aref=123&amp;medium=email&amp;mid=1a2b3c&amp;bcode=2.34567890.ABCxyz&amp;;lloc=image;lloc=actor_profile&amp;aref=789&amp;medium=email&amp;mid=a1b2c3&amp;bcode=2.34567890.ABCxyz&amp; Example posts:;story_fbid=123&amp;id=456&amp;comment_id=789&amp;aref=012&amp;medium=email&amp;mid=a1b2c3&amp;bcode=2.34567890.ABCxyz&amp;;story_fbid=123&amp;id=456&amp;aref=789&amp;medium=email&amp;mid=a1b2c3&amp;bcode=2.2.34567890.ABCxyz&amp;;fbid=123&amp;set=a.456&amp;type=3&amp;comment_id=789&amp;force_theater=true&amp;aref=123&amp;medium=email&amp;mid=a1b2c3&amp;bcode=2.34567890.ABCxyz&amp; Args: url: string Returns: string, sanitized URL """ if util.domain_from_link(url) != cls.DOMAIN and not url.startswith(M_HTML_BASE_URL): return url url = url.replace(M_HTML_BASE_URL, cls.BASE_URL) parsed = urllib.parse.urlparse(url) parts = list(parsed) if parsed.path in ('/nd/', '/n/', '/story.php'): query = urllib.parse.unquote(html.unescape(parsed.query)) if parsed.path in ('/nd/', '/n/'): new_path, query = query.split('&', 1) parts[2] = new_path new_query = [(k, v) for k, v in urllib.parse.parse_qsl(query) if k in ('story_fbid', 'fbid', 'id', 'comment_id')] parts[4] = urllib.parse.urlencode(new_query) parts[5] = '' # fragment return urllib.parse.urlunparse(parts) @staticmethod def _scraped_datetime(tag): """Tries to parse a datetime string scraped from HTML (web or email). Examples seen in the wild: December 14 at 12:35 PM 5 July at 21:50 Args: tag: :class:`bs4.BeautifulSoup` or :class:`bs4.Tag` """ if not tag: return None try: # sadly using parse(fuzzy=True) here makes too many mistakes on relative # time strings seen on mbasic, eg '22 hrs [ago]', 'Yesterday at 12:34 PM' parsed = dateutil.parser.parse(tag.get_text(strip=True), return parsed.isoformat('T') except (ValueError, OverflowError): logger.debug(f"Couldn't parse datetime string {tag!r}") def _scrape_m(self, user_id=None, activity_id=None, fetch_replies=False, fetch_likes=False, **kwargs): """Scrapes a user's timeline or a post and converts it to activities. Args: user_id: string activity_id: string fetch_replies: boolean fetch_likes: boolean kwargs: passed through to scraped_to_activit[y/ies] Returns: dict activities API response """ user_id = user_id or self.user_id or '' if not (self.cookie_c_user and self.cookie_xs): raise NotImplementedError('Scraping requires c_user and xs cookies.') def get(url, *params, allow_redirects=False): url = urllib.parse.urljoin(M_HTML_BASE_URL, url % params) cookie = f'c_user={self.cookie_c_user}; xs={self.cookie_xs}' resp = util.requests_get(url, allow_redirects=allow_redirects, headers={ 'Cookie': cookie, 'User-Agent': SCRAPE_USER_AGENT, }) resp.raise_for_status() return resp if activity_id: # permalinks with classic ids now redirect to URLs with pfbid ids # resp = get(activity_id, allow_redirects=True) activities = [self.scraped_to_activity(resp.text, **kwargs)[0]] else: resp = get(M_HTML_TIMELINE_URL, user_id) activities, _ = self.scraped_to_activities(resp.text, **kwargs) if fetch_replies: # fetch and convert individual post permalinks # TODO: cache? fbids = [a['fb_id'] for a in activities] activities = [] for id in fbids: resp = get(id) activities.append(self.scraped_to_activity(resp.text)[0]) if fetch_likes: # fetch and convert likes # TODO: cache? for activity in activities: resp = get(M_HTML_REACTIONS_URL, activity['fb_id']) self.merge_scraped_reactions(resp.text, activity) return self.make_activities_base_response(activities)
[docs] def scraped_to_activities(self, scraped, log_html=False, **kwargs): """Converts HTML from an timeline to AS1 activities. Args: scraped: str, HTML log_html: boolean kwargs: unused Returns: tuple: ([AS activities], AS logged in actor (ie viewer)) """ soup = util.parse_html(scraped) if log_html: activities = [] for post in soup.find_all(('article', 'div'), id=re.compile('u_0_.+')): permalink = post.find(href=re.compile(r'^/story\.php\?')) if not permalink: logger.debug('Skipping one due to missing permalink') continue header = post.find('header') if header and ('Suggested for you' in header.stripped_strings or 'is with' in header.stripped_strings): logger.debug(f'Skipping {header.stripped_strings}') continue post_id, owner_id = self._extract_scraped_ids(post) if not post_id or not owner_id: url = urllib.parse.urljoin(self.BASE_URL, permalink['href']) query = urllib.parse.urlparse(url).query parsed = urllib.parse.parse_qs(query) # story_fbid stopped being useful in May 2022, it switched to an opaque # token that changes regularly, even for the same post. # ft = util.get_first(parsed, '_ft_') or '' for elem in ft.split(':'): if elem.startswith('top_level_post_id.') or elem.startswith('mf_objid.'): post_id = elem.split('.')[1] if post_id: break else: post_id = util.get_first(parsed, 'story_fbid') owner_id = parsed['id'][0] author = self._m_html_author(post) if not author: logger.debug('Skipping due to missing author') continue author['id'] = self.tag_uri(owner_id) footer = post.footer if not footer: logger.debug('Skipping due to missing footer') continue public = footer and ('Public' in footer.stripped_strings or # 'Доступно всем' in footer.stripped_strings) to = ({'objectType': 'group', 'alias': '@public'} if public else {'objectType': 'unknown'}) id = self.tag_uri(post_id) # pictures and videos. (remove from post so any text, eg "Play Video," # isn't included in the content.) attachments = [] for a in post.find_all('a', href=re.compile(r'^/photo\.php\?')): if a.img: alt = a.img.get('alt') attachments.append({ 'objectType': 'image', 'displayName': alt, 'image': { 'url': a.img.get('src'), 'displayName': alt, }}) a.extract() for a in post.find_all('a', href=re.compile(r'^/video_redirect/\?')): vid = urllib.parse.parse_qs(urllib.parse.urlparse(a['href']).query).get('src') attachments.append({ 'objectType': 'video', 'stream': {'url': vid[0] if vid else None}, 'image': {'url': a.img.get('src') if a.img else None}, }) a.extract() # link attachments attachments.extend(self._scrape_attachments(post)) # comment count # TODO: internationalize this :/ comments_count = None comments_link = post.find('a', string=re.compile('Comment')) if comments_link: tokens = comments_link.string.split() if tokens and util.is_int(tokens[0]): comments_count = int(tokens[0]) # reaction count reactions_count = None react_link = post.find('a', href=re.compile('^/reactions/picker/')) if react_link: prev = react_link.find_previous_siblings('a') if prev: count_text = prev[-1].get_text(' ', strip=True) if util.is_int(count_text): reactions_count = int(count_text) url = self._sanitize_url(urllib.parse.urljoin(self.BASE_URL, f'/{post_id}')) activities.append({ 'objectType': 'activity', 'verb': 'post', 'id': id, 'fb_id': post_id, 'url': url, 'actor': author, 'object': { 'objectType': 'note', 'id': id, 'fb_id': post_id, 'url': url, 'content': self._scraped_content(post), 'published': self._scraped_datetime(footer.abbr), 'author': author, 'to': [to], 'replies': {'totalItems': comments_count}, 'fb_reaction_count': reactions_count, 'attachments': attachments, }, }) return util.trim_nulls(activities), None
[docs] def scraped_to_activity(self, scraped, log_html=False, **kwargs): """Converts HTML from an post page to an AS1 activity. Args: scraped: str, HTML from an post permalink log_html: boolean kwargs: unused Returns: tuple: (dict AS activity or None, AS logged in actor (ie viewer)) """ soup = util.parse_html(scraped) if log_html: view = soup.find(id='m_story_permalink_view') or soup.find(id='MPhotoContent') if not view: return None, None published = view.find('abbr') footer = view.find('footer') if not footer and published: footer = published.parent.parent if view['id'] == 'MPhotoContent': body_parts = self._div(view, 0) else: body_parts = footer.find_previous_sibling('div') if body_parts and not body_parts.get_text('', strip=True): body_parts = body_parts.find_previous_sibling('div') if not body_parts: return None, None # author author = {} header = body_parts.find('header') actor_link = body_parts.find('a', class_='actor-link') if header: author = self._m_html_author(header) elif actor_link: author = self._profile_url_to_actor(actor_link['href']) author['displayName'] = actor_link.get_text(' ', strip=True) # visibility public = footer and ('Public' in footer.stripped_strings or # 'Доступно всем' in footer.stripped_strings) # post activity activity = { 'objectType': 'activity', 'verb': 'post', 'actor': author, 'object': { 'objectType': 'note', 'content': self._scraped_content(body_parts), 'published': self._scraped_datetime(published), 'author': author, 'to': [{'objectType': 'group', 'alias': '@public'} if public else {'objectType': 'unknown'}], }, } # photo action_bar = soup.find(id='MPhotoActionbar') if action_bar: photo = action_bar.find_previous_sibling('div') if photo: img = photo.find('img') if img: activity['object'].update({ 'objectType': 'photo', 'image': { 'url': img.get('src'), 'displayName': img.get('alt'), } }) post_id, owner_id = self._extract_scraped_ids(soup) if not post_id or not owner_id: return activity, None # numeric ids ids = { 'id': self.tag_uri(post_id), 'fb_id': post_id, 'url': f'{self.BASE_URL}{post_id}', } activity.update(ids) activity['object'].update(ids) author['id'] = self.tag_uri(owner_id) if not author.get('url'): author['url'] = urllib.parse.urljoin(self.BASE_URL, owner_id) # link attachments activity['object']['attachments'] = [] for div in body_parts.find_all('div', recursive=False)[1:]: activity['object']['attachments'].extend(self._scrape_attachments(div)) # comments replies = [] for comment in soup.find_all(id=re.compile(r'^\d+$')): # TODO: images in replies, eg: # replies.append({ 'objectType': 'comment', 'id': self._comment_id(post_id, comment['id']), 'url': util.add_query_params(ids['url'], {'comment_id': comment['id']}), 'content': self._scraped_content(comment), 'author': self._m_html_author(comment, 'h3'), 'published': self._scraped_datetime(comment.find('abbr')), 'inReplyTo': [{'id': ids['id'], 'url': ids['url']}], }) if replies: activity['object']['replies'] = { 'items': replies, 'totalItems': len(replies), } return util.trim_nulls(activity), None
@staticmethod def _extract_scraped_ids(soup): """Tries to scrape post id and owner id out of parsed HTML. Background on the time-limited pfbid param and why we don't want to use it in permalinks: Args: soup: :class:`bs4.BeautifulSoup` or :class:`bs4.Tag` Returns: (str post id, str owner id) tuple, or (None, None) if not found """ post_id = owner_id = None post_id_re = re.compile('mf_story_key|top_level_post_id') data_ft_div = (soup if'data-ft', '')) else soup.find(attrs={'data-ft': post_id_re})) if data_ft_div: data_ft = json_loads(html.unescape(data_ft_div['data-ft'])) # prefer top_level_post_id, mf_story_key doesn't always work in FB URLs # that we construct, eg[mf_story_key] post_id = data_ft.get('top_level_post_id') or data_ft.get('mf_story_key') owner_id = str(data_ft.get('content_owner_id_new') if 'mf_story_key' in data_ft # this post is from a user else data_ft.get('page_id')) # this post is from a group if post_id and not post_id.startswith('pfbid'): return post_id, owner_id comment_form = soup.find('form', action=re.compile('^/a/comment.php')) if comment_form: query = urllib.parse.urlparse(comment_form['action']).query parsed = urllib.parse.parse_qs(query) post_id = parsed['ft_ent_identifier'][0] owner_id = parsed['av'][0] return post_id, owner_id
[docs] def merge_scraped_reactions(self, scraped, activity): """Converts and merges scraped likes and reactions into an activity. New likes and emoji reactions are added to the activity in 'tags'. Existing likes and emoji reactions in 'tags' are ignored. Args: scraped: str, HTML from an page activity: dict, AS activity to merge these reactions into Returns: list of dict AS like/react tag objects converted from scraped """ soup = util.parse_html(scraped) tags = [] for reaction in soup.find_all('li'): if reaction.get_text(' ', strip=True) == 'See More': continue imgs = reaction.find_all('img') if len(imgs) < 2: continue # TODO: profile pic is imgs[0] type = imgs[1]['alt'].lower() type_str = 'liked' if type == 'like' else type author = self._m_html_author(reaction, 'h3') if 'id' not in author: continue _, username = util.parse_tag_uri(author['id']) tag = { 'objectType': 'activity', 'verb': 'like' if type == 'like' else 'react', 'id': self.tag_uri(f"{activity['fb_id']}_{type_str}_by_{username}"), 'url': activity['url'] + f'#{type_str}-by-{username}', 'object': {'url': activity['url']}, 'author': author, } if type != 'like': tag['content'] = REACTION_CONTENT.get(type.upper()) tags.append(tag) as1.merge_by_id(activity['object'], 'tags', tags) return tags
[docs] def scraped_to_actor(self, scraped): """Converts HTML from an profile about page to an AS1 actor. Args: scraped: str, HTML from an post permalink Returns: dict, AS1 actor """ soup = util.parse_html(scraped) root = soup.find(id='root') if not root: return None # summary/tagline summary_div = self._div(root, 0, 0, 1, 1) # if this is the logged in user, there will be an extra div and an Edit Bio # link after it summary = '' if summary_div: summary = (self._div(summary_div, 0) or summary_div ).get_text(' ', strip=True) # confusingly, the HTML has this as id=bio in HTML, but UI calls it "About # [name]" and calls the summary snippet above "bio" instead. about = None bio = root.find(id='bio') if bio: for edit_link in bio.find_all( 'a', href=re.compile(r'^/profile/edit/infotab/section/forms/\?section=bio')): edit_link.clear() bio = self._div_text(bio, 0, 0) # web sites websites = [] for label in root.find_all('a', href=re.compile( '^/editprofile.php\?type=contact&edit=website')): websites.append(label.find_parent('td').find_next_sibling('td') .get_text(' ', strip=True)) websites += util.extract_links(summary) + util.extract_links(about) # name, profile picture name = soup.title.get_text(' ', strip=True) actor = { 'objectType': 'person', 'displayName': name, 'description': bio, 'summary': summary, 'urls': [{'value': url} for url in websites], } # profile picture if name: img = soup.find('img', alt=re.compile(f'^{name},.*')) if img: actor['image'] = {'url': img['src']} profile = root.find('a', href=re.compile(r'[?&]v=timeline')) if profile: actor.update(self._profile_url_to_actor(profile['href'])) actor['urls'].insert(0, {'value': actor['url']}) return util.trim_nulls(actor)
@classmethod def _scraped_content(cls, tag): """Extract and process content. Args: tag: BeautifulSoup Tag Returns: string """ # TODO: distinguish between text elements with actual whitespace # before/after and without. this adds space to all of them, including # before punctuation, so you end up with eg 'Oh hi, Jeeves .' # (also apply any fix to scraped_to_activity().) try: content = cls._div(tag, 0, 0) except IndexError: logger.debug('Skipping due to non-post format (searching for content)') return '' if not content: return '' # remove outer tags while in ('div', 'span', 'p'): elems = [t for t in content.contents if not (isinstance(t, NavigableString) and t.string.strip() == '')] if len(elems) == 1 and isinstance(elems[0], Tag): content = elems[0] else: break # join embedded links without whitespace for link in content.find_all( 'a', href=re.compile(r'^https://lm\.facebook\.com/l\.php')): new_link = util.pretty_link(cls._unwrap_link(link)) link.replace_with(util.parse_html(new_link).a) # remove Facebook's "... More" when content is ellipsized more = content.find('a', href=re.compile(r'^/story\.php'), string='More') if more: more.extract() return str(content) @classmethod def _scrape_attachments(cls, tag): """Extracts link attachments from a post. Args: tag: BeautifulSoup a or img tag Returns: list of AS1 attachment objects """ atts = [] for link in tag.find_all('a', href=re.compile(r'^https://lm\.facebook\.com/l\.php')): url = cls._unwrap_link(link) name = link.h3 img = link.img # all links in post text are wrapped, so check h3 and img so that we only # trigger on actual attachments if url and name and img: atts.append({ 'objectType': 'article', 'url': url, 'displayName': name.get_text(' ', strip=True), 'image': {'url': cls._unwrap_link(img)}, }) return atts @staticmethod def _unwrap_link(tag): """Extracts the destination URL from a wrapped link. Currently supported: (links) (images) Args: tag: BeautifulSoup a or img tag Returns: string URL, or None """ if tag: query = urllib.parse.parse_qs( urllib.parse.urlparse(tag.get('href') or tag.get('src')).query) url = query.get('u') or query.get('url') if url: return util.remove_query_param(url[0], 'fbclid')[0] def _m_html_author(self, soup, tag='strong'): """ Finds an author link in HTML and converts it to AS1. Args: soup: BeautifulSoup tag: optional, HTML tag surrounding <a> Returns: dict AS1 actor """ if not soup: return {} author = soup.find(tag) if not author: return {} author = author.find('a') url = author.get('href') if not url: return {} actor = self._profile_url_to_actor(author['href']) actor['displayName'] = author.get_text(' ', strip=True) return actor def _profile_url_to_actor(self, url): """ Converts a profile URL to an AS1 actor. Args: url: str Returns: dict AS1 actor, or {} if the profile URL can't be parsed """ parsed = urllib.parse.urlparse(url) path = parsed.path.strip('/') params = urllib.parse.parse_qs(parsed.query) id = params.get('id') if id: id = id[0] lst = params.get('lst') if lst: # lst param is '[logged in user id]:[displayed user id]:[unknown]' id = urllib.parse.unquote(lst[0]).split(':')[1] username = None if not path.endswith('.php'): username = path if id or username: return { 'objectType': 'person', 'id': self.tag_uri(id or username), 'numeric_id': id, 'url': urllib.parse.urljoin(self.BASE_URL, username or id), 'username': username, } return {} @staticmethod def _div(tag, *args): """Returns a descendant div via a given path of divs. Args: *args: div indexes to navigate down. eg [1, 0, 2] means the second div child of tag, then that div's first div child, then that div's third div child. Returns: Tag, or None if the given path doesn't exist """ assert args try: for index in args: tag = tag.find_all('div', recursive=False)[index] return tag except IndexError: return None @staticmethod def _div_text(tag, *args): """Returns the text inside a div returned by _get_div(). Args: *args: see _div() Returns: str, or None if the given path doesn't exist """ div = Facebook._div(tag, *args) return div.get_text(' ', strip=True) if div else None
[docs] @staticmethod def parse_id(id, is_comment=False): """Parses a Facebook post or comment id. Facebook ids come in different formats: * Simple number, usually a user or post: 12 * Two numbers with underscore, usually POST_COMMENT or USER_POST: 12_34 * Three numbers with underscores, USER_POST_COMMENT: 12_34_56 * Three numbers with colons, USER:POST:SHARD: 12:34:63 (We're guessing that the third part is a shard in some FB internal system. In our experience so far, it's always either 63 or the app-scoped user id for 63.) * Two numbers with colon, POST:SHARD: 12:34 (We've seen 0 as shard in this format.) * Four numbers with colons/underscore, USER:POST:SHARD_COMMENT: 12:34:63_56 * Five numbers with colons/underscore, USER:EVENT:UNKNOWN:UNKNOWN_UNKNOWN Not currently supported! Examples: 111599105530674:998145346924699:10102446236688861:10207188792305341_998153510257216 111599105530674:195181727490727:10102446236688861:10205257726909910_195198790822354 Background: * * Args: id: string or integer is_comment: boolean Returns: FacebookId: Some or all fields may be None. """ assert is_comment in (True, False), is_comment blank = FacebookId(None, None, None) if id in (None, '', 'login.php'): # some FB permalinks redirect to login.php, e.g. group and non-public posts return blank id = str(id) user = None post = None comment = None by_colon = id.split(':') by_underscore = id.split('_') # colon id? if len(by_colon) in (2, 3) and all(by_colon): if len(by_colon) == 3: user = by_colon.pop(0) post, shard = by_colon parts = shard.split('_') if len(parts) >= 2 and parts[-1]: comment = parts[-1] elif len(by_colon) == 2 and all(by_colon): post = by_colon[0] # underscore id? elif len(by_underscore) == 3 and all(by_underscore): user, post, comment = by_underscore elif len(by_underscore) == 2 and all(by_underscore): if is_comment: post, comment = by_underscore else: user, post = by_underscore # plain number? elif util.is_int(id): if is_comment: comment = id else: post = id fbid = FacebookId(user, post, comment) for sub_id in user, post, comment: if sub_id and not re.match(r'^[0-9a-zA-Z]+$', sub_id): fbid = blank if fbid == blank: logger.error(f'Cowardly refusing Facebook id with unknown format: {id}') return fbid
[docs] def resolve_object_id(self, user_id, post_id, activity=None): """Resolve a post id to its Facebook object id, if any. Used for photo posts, since Facebook has (at least) two different objects (and ids) for them, one for the post and one for each photo. This is the same logic that we do for canonicalizing photo objects in get_activities() above. If activity is not provided, fetches the post from Facebook. Args: user_id: string Facebook user id who posted the post post_id: string Facebook post id activity: optional AS activity representation of Facebook post Returns: string: Facebook object id or None """ assert user_id, user_id assert post_id, post_id if activity: fb_id = (activity.get('fb_object_id') or activity.get('object', {}).get('fb_object_id')) if fb_id: return str(fb_id) parsed = self.parse_id(post_id) if post_id = with util.ignore_http_4xx_error(): post = self.urlopen(API_OBJECT % (user_id, post_id)) resolved = post.get('object_id') if resolved:'Resolved Facebook post id {post_id!r} to {resolved!r}.') return str(resolved)
[docs] def urlopen(self, url, _as=dict, **kwargs): """Wraps :func:`urllib2.urlopen()` and passes through the access token. Args: _as: if not None, parses the response as JSON and passes it through _as() with this type. if None, returns the response object. Returns: decoded JSON object or urlopen response object """ if not url.startswith('http'): url = API_BASE + url if self.access_token: url = util.add_query_params(url, [('access_token', self.access_token)]) resp = util.urlopen(urllib.request.Request(url, **kwargs)) if _as is None: return resp body = try: return self._as(_as, source.load_json(body, url)) except ValueError: # couldn't parse JSON logger.debug(f'Response: {resp.getcode()} {body}') raise
@staticmethod def _as(type, resp): """Converts an API response to a specific type. If resp isn't the right type, an empty instance of type is returned. If type is list, the response is expected to be a dict with the returned list in the 'data' field. If the response is a list, it's returned as is. Args: type: list or dict resp: parsed JSON object """ assert type in (list, dict) if type is list: if isinstance(resp, dict): resp = resp.get('data', []) else: logger.warning(f'Expected dict response with `data` field, got {resp}') if isinstance(resp, type): return resp else: logger.warning(f'Expected {type} response, got {resp}') return type()
[docs] def urlopen_batch(self, urls): """Sends a batch of multiple API calls using Facebook's batch API. Raises the appropriate :class:`urllib2.HTTPError` if any individual call returns HTTP status code 4xx or 5xx. Args: urls: sequence of string relative API URLs, e.g. ('me', 'me/accounts') Returns: sequence of responses, either decoded JSON objects (when possible) or raw string bodies """ resps = self.urlopen_batch_full([{'relative_url': url} for url in urls]) bodies = [] for url, resp in zip(urls, resps): code = int(resp.get('code', 0)) body = resp.get('body') if code // 100 in (4, 5): raise urllib.error.HTTPError(url, code, body, resp.get('headers'), None) bodies.append(body) return bodies
[docs] def urlopen_batch_full(self, requests): """Sends a batch of multiple API calls using Facebook's batch API. Similar to urlopen_batch(), but the requests arg and return value are dicts with headers, HTTP status code, etc. Only raises :class:`urllib2.HTTPError` if the outer batch request itself returns an HTTP error. Args: requests: sequence of dict requests in Facebook's batch format, except that headers is a single dict, not a list of dicts, e.g.:: [{'relative_url': 'me/feed', 'headers': {'ETag': 'xyz', ...}, }, ... ] Returns: sequence of dict responses in Facebook's batch format, except that body is JSON-decoded if possible, and headers is a single dict, not a list of dicts, e.g.:: [{'code': 200, 'headers': {'ETag': 'xyz', ...}, 'body': {...}, }, ... ] """ for req in requests: if 'method' not in req: req['method'] = 'GET' if 'headers' in req: req['headers'] = [{'name': n, 'value': v} for n, v in sorted(req['headers'].items())] data = 'batch=' + json_dumps(util.trim_nulls(requests), sort_keys=True) resps = self.urlopen('', data=data, _as=list) for resp in resps: if 'headers' in resp: resp['headers'] = {h['name']: h['value'] for h in resp['headers']} body = resp.get('body') if body: try: resp['body'] = json_loads(body) except (ValueError, TypeError): pass return resps