Source code for granary.atom

"""Convert between ActivityStreams 1 and Atom.

Atom spec: https://tools.ietf.org/html/rfc4287 (RIP atomenabled.org)
"""
import collections
import mimetypes
import os
import re
import urlparse
from xml.etree import ElementTree
import xml.sax.saxutils

from bs4 import BeautifulSoup
import jinja2
import mf2py
import mf2util
from oauth_dropins.webutil import util

import microformats2
import source

FEED_TEMPLATE = 'user_feed.atom'
ENTRY_TEMPLATE = 'entry.atom'
# stolen from django.utils.html
UNENCODED_AMPERSANDS_RE = re.compile(r'&(?!(\w+|#\d+);)')
NAMESPACES = {
  'activity': 'http://activitystrea.ms/spec/1.0/',
  'atom': 'http://www.w3.org/2005/Atom',
  'georss': 'http://www.georss.org/georss',
  'thr': 'http://purl.org/syndication/thread/1.0',
}

jinja_env = jinja2.Environment(
  loader=jinja2.PackageLoader(__package__, 'templates'), autoescape=True)


def _encode_ampersands(text):
  return UNENCODED_AMPERSANDS_RE.sub('&', text)


def _tag(elem):
  """Removes the namespace from an ElementTree element tag."""
  return elem.tag.split('}')[-1]


def _text(elem, field=None):
  """Returns the text in an element or child element if it exists.

  For example, if field is 'name' and elem contains <name>Ryan</name>, returns
  'Ryan'.

  Args:
    elem: ElementTree.Element
    field: string

  Returns: string or None

  """
  if field:
    if ':' not in field:
      field = 'atom:' + field
    elem = elem.find(field, NAMESPACES)
  if elem is not None and elem.text:
    text = elem.text
    if not isinstance(elem.text, unicode):
      text = text.decode('utf-8')
    return text.strip()


def _as1_value(elem, field):
  """Returns an AS1 namespaced schema value if it exists.

  For example, returns 'like' for field 'verb' if elem contains:

    <activity:verb>http://activitystrea.ms/schema/1.0/like</activity:verb>

  Args:
    elem: ElementTree.Element
    field: string

  Returns: string or None
  """
  type = _text(elem, 'activity:%s' % field)
  if type:
    return type.split('/')[-1]


# Emulate Django template behavior that returns a special default value that
# can continue to be referenced when an attribute or item lookup fails. Helps
# avoid conditionals in the template itself.
# https://docs.djangoproject.com/en/1.8/ref/templates/language/#variables
class Defaulter(collections.defaultdict):
  def __init__(self, **kwargs):
    super(Defaulter, self).__init__(Defaulter, **{
      k: (Defaulter(**v) if isinstance(v, dict) else v)
      for k, v in kwargs.items()})

  def __unicode__(self):
    return super(Defaulter, self).__unicode__() if self else u''

  def __hash__(self):
    return super(Defaulter, self).__hash__() if self else None.__hash__()


[docs]def activities_to_atom(activities, actor, title=None, request_url=None, host_url=None, xml_base=None, rels=None, reader=True): """Converts ActivityStreams 1 activities to an Atom feed. Args: activities: list of ActivityStreams activity dicts actor: ActivityStreams actor dict, the author of the feed title: string, the feed <title> element. Defaults to 'User feed for [NAME]' request_url: the URL of this Atom feed, if any. Used in a link rel="self". host_url: the home URL for this Atom feed, if any. Used in the top-level feed <id> element. xml_base: the base URL, if any. Used in the top-level xml:base attribute. rels: rel links to include. dict mapping string rel value to string URL. reader: boolean, whether the output will be rendered in a feed reader. Currently just includes location if True, not otherwise. Returns: unicode string with Atom XML """ # Strip query params from URLs so that we don't include access tokens, etc host_url = (_remove_query_params(host_url) if host_url else 'https://github.com/snarfed/granary') if request_url is None: request_url = host_url for a in activities: _prepare_activity(a, reader=reader) if actor is None: actor = {} return jinja_env.get_template(FEED_TEMPLATE).render( actor=Defaulter(**actor), host_url=host_url, items=[Defaulter(**a) for a in activities], mimetypes=mimetypes, rels=rels or {}, request_url=request_url, title=title or 'User feed for ' + source.Source.actor_name(actor), updated=activities[0]['object'].get('published', '') if activities else '', VERBS_WITH_OBJECT=source.VERBS_WITH_OBJECT, xml_base=xml_base, )
[docs]def activity_to_atom(activity, xml_base=None, reader=True): """Converts a single ActivityStreams 1 activity to an Atom entry. Kwargs are passed through to :func:`activities_to_atom`. Args: xml_base: the base URL, if any. Used in the top-level xml:base attribute. reader: boolean, whether the output will be rendered in a feed reader. Currently just includes location if True, not otherwise. Returns: unicode string with Atom XML """ _prepare_activity(activity, reader=reader) return jinja_env.get_template(ENTRY_TEMPLATE).render( activity=Defaulter(**activity), mimetypes=mimetypes, VERBS_WITH_OBJECT=source.VERBS_WITH_OBJECT, xml_base=xml_base, )
[docs]def atom_to_activities(atom): """Converts an Atom feed to ActivityStreams 1 activities. Args: atom: unicode string, Atom document with top-level <feed> element Returns: list of ActivityStreams activity dicts """ assert isinstance(atom, unicode) parser = ElementTree.XMLParser(encoding='UTF-8') feed = ElementTree.XML(atom.encode('utf-8'), parser=parser) if _tag(feed) != 'feed': raise ValueError('Expected root feed tag; got %s' % feed.tag) return [_atom_to_activity(elem) for elem in feed if _tag(elem) == 'entry']
[docs]def atom_to_activity(atom): """Converts an Atom entry to an ActivityStreams 1 activity. Args: atom: unicode string, Atom document with top-level <entry> element Returns: dict, ActivityStreams activity """ assert isinstance(atom, unicode) parser = ElementTree.XMLParser(encoding='UTF-8') entry = ElementTree.XML(atom.encode('utf-8'), parser=parser) if _tag(entry) != 'entry': raise ValueError('Expected root entry tag; got %s' % entry.tag) return _atom_to_activity(entry)
def _atom_to_activity(entry): """Converts an internal Atom entry element to an ActivityStreams 1 activity. Args: entry: ElementTree <entry> element Returns: dict, ActivityStreams activity """ # default object data from entry. override with data inside activity:object. obj_elem = entry.find('activity:object', NAMESPACES) obj = _atom_to_object(obj_elem if obj_elem is not None else entry) content = entry.find('atom:content', NAMESPACES) if content is not None: # TODO: use 'html' instead of 'text' to include HTML tags. the problem is, # if there's an embedded XML namespace, it prefixes *every* tag with that # namespace. breaks on e.g. the <div xmlns="http://www.w3.org/1999/xhtml"> # that our Atom templates wrap HTML content in. text = ElementTree.tostring(content, 'utf-8', 'text').decode('utf-8') obj['content'] = re.sub(r'\s+', ' ', text.strip()) point = _text(entry, 'georss:point') if point: lat, long = point.split(' ') obj['location'].update({ 'latitude': float(lat), 'longitude': float(long), }) a = { 'objectType': 'activity', 'verb': _as1_value(entry, 'verb'), 'id': _text(entry, 'id') or (obj['id'] if obj_elem is None else None), 'url': _text(entry, 'uri') or (obj['url'] if obj_elem is None else None), 'object': obj, 'actor': _author_to_actor(entry), 'inReplyTo': obj.get('inReplyTo'), } return source.Source.postprocess_activity(a) def _atom_to_object(elem): """Converts an Atom entry to an ActivityStreams 1 object. Args: elem: ElementTree.Element Returns: dict, ActivityStreams object """ uri = _text(elem, 'uri') or _text(elem) return { 'objectType': _as1_value(elem, 'object-type'), 'id': _text(elem, 'id') or uri, 'url': uri, 'title': _text(elem, 'title'), 'published': _text(elem, 'published'), 'updated': _text(elem, 'updated'), 'inReplyTo': [{ 'id': r.attrib.get('ref') or _text(r), 'url': r.attrib.get('href') or _text(r), } for r in elem.findall('thr:in-reply-to', NAMESPACES)], 'location': { 'displayName': _text(elem, 'georss:featureName'), } } def _author_to_actor(elem): """Converts an Atom <author> element to an ActivityStreams 1 actor. Looks for <author> *inside* elem. Args: elem: ElementTree.Element Returns: dict, ActivityStreams actor object """ author = elem.find('atom:author', NAMESPACES) if author is not None: return { 'objectType': _as1_value(author, 'object-type'), 'id': _text(author, 'id'), 'url': _text(author, 'uri'), 'displayName': _text(author, 'name'), 'email': _text(author, 'email'), }
[docs]def html_to_atom(html, url=None, fetch_author=False, reader=True): """Converts microformats2 HTML to an Atom feed. Args: html: string url: string URL html came from, optional fetch_author: boolean, whether to make HTTP request to fetch rel-author link reader: boolean, whether the output will be rendered in a feed reader. Currently just includes location if True, not otherwise. Returns: unicode string with Atom XML """ if fetch_author: assert url, 'fetch_author=True requires url!' parsed = mf2py.parse(doc=html, url=url) actor = microformats2.find_author( parsed, fetch_mf2_func=lambda url: mf2py.parse(url=url)) return activities_to_atom( microformats2.html_to_activities(html, url, actor), actor, title=mf2util.interpret_feed(parsed, url).get('name'), xml_base=util.base_url(url), host_url=url, reader=reader)
def _prepare_activity(a, reader=True): """Preprocesses an activity to prepare it to be rendered as Atom. Modifies a in place. Args: a: ActivityStreams 1 activity dict reader: boolean, whether the output will be rendered in a feed reader. Currently just includes location if True, not otherwise. """ act_type = source.object_type(a) if not act_type or act_type == 'post': primary = a.get('object', {}) else: primary = a obj = a.setdefault('object', {}) # Render content as HTML; escape &s obj['rendered_content'] = _encode_ampersands(microformats2.render_content( primary, include_location=reader, render_attachments=True)) # Make sure every activity has the title field, since Atom <entry> requires # the title element. if not a.get('title'): a['title'] = util.ellipsize(_encode_ampersands( a.get('displayName') or a.get('content') or obj.get('title') or obj.get('displayName') or obj.get('content') or 'Untitled')) # strip HTML tags. the Atom spec says title is plain text: # http://atomenabled.org/developers/syndication/#requiredEntryElements a['title'] = xml.sax.saxutils.escape(BeautifulSoup(a['title']).get_text('')) children = [] image_urls_seen = set() image_atts = [] # normalize attachments, render attached notes/articles attachments = a.get('attachments') or obj.get('attachments') or [] for att in attachments: att['stream'] = util.get_first(att, 'stream') type = att.get('objectType') if type == 'image': image_atts.append(util.get_first(att, 'image')) continue image_urls_seen |= set(util.get_urls(att, 'image')) if type in ('note', 'article'): html = microformats2.render_content(att, include_location=reader, render_attachments=True) author = att.get('author') if author: name = microformats2.maybe_linked_name( microformats2.object_to_json(author).get('properties', [])) html = '%s: %s' % (name.strip(), html) children.append(html) # render image(s) that we haven't already seen for image in image_atts + util.get_list(obj, 'image'): if not image: continue url = image.get('url') parsed = urlparse.urlparse(url) scheme = parsed.scheme netloc = parsed.netloc rest = urlparse.urlunparse(('', '') + parsed[2:]) img_src_re = re.compile(r"""src *= *['"] *((https?:)?//%s)?%s *['"]""" % (re.escape(netloc), re.escape(rest))) if (url and url not in image_urls_seen and not img_src_re.search(obj['rendered_content'])): children.append(microformats2.img(url)) image_urls_seen.add(url) obj['rendered_children'] = [_encode_ampersands(html) for html in children] # make sure published and updated are strict RFC 3339 timestamps for prop in 'published', 'updated': val = obj.get(prop) if val: obj[prop] = util.maybe_iso8601_to_rfc3339(val) # Atom timestamps are even stricter than RFC 3339: they can't be naive ie # time zone unaware. They must have either an offset or the Z suffix. # https://www.feedvalidator.org/docs/error/InvalidRFC3339Date.html if not util.TIMEZONE_OFFSET_RE.search(obj[prop]): obj[prop] += 'Z' def _remove_query_params(url): parsed = list(urlparse.urlparse(url)) parsed[4] = '' return urlparse.urlunparse(parsed)