Source code for granary.atom

"""Convert between ActivityStreams 1 and Atom.

Atom spec: http://atomenabled.org/developers/syndication/
"""

import collections
import os
import re
import urlparse
from xml.etree import ElementTree
import xml.sax.saxutils

from bs4 import BeautifulSoup
import jinja2
import mf2py
import mf2util
from oauth_dropins.webutil import util

import microformats2
import source

FEED_TEMPLATE = 'user_feed.atom'
ENTRY_TEMPLATE = 'entry.atom'
# stolen from django.utils.html
UNENCODED_AMPERSANDS_RE = re.compile(r'&(?!(\w+|#\d+);)')
NAMESPACES = {
  'activity': 'http://activitystrea.ms/spec/1.0/',
  'atom': 'http://www.w3.org/2005/Atom',
  'georss': 'http://www.georss.org/georss',
  'thr': 'http://purl.org/syndication/thread/1.0',
}

jinja_env = jinja2.Environment(
  loader=jinja2.PackageLoader(__package__, 'templates'), autoescape=True)


def _encode_ampersands(text):
  return UNENCODED_AMPERSANDS_RE.sub('&', text)

def _text(elem, field=None):
  """Returns the text in an element or child element if it exists.

  For example, if field is 'name' and elem contains <name>Ryan</name>, returns
  'Ryan'.

  Args:
    elem: ElementTree.Element
    field: string

  Returns: string or None

  """
  if field:
    if ':' not in field:
      field = 'atom:' + field
    elem = elem.find(field, NAMESPACES)
  if elem is not None and elem.text:
    return elem.text.decode('utf-8').strip()


def _as1_value(elem, field):
  """Returns an AS1 namespaced schema value if it exists.

  For example, returns 'like' for field 'verb' if elem contains:

    <activity:verb>http://activitystrea.ms/schema/1.0/like</activity:verb>

  Args:
    elem: ElementTree.Element
    field: string

  Returns: string or None
  """
  type = _text(elem, 'activity:%s' % field)
  if type:
    return type.split('/')[-1]


# Emulate Django template behavior that returns a special default value that
# can continue to be referenced when an attribute or item lookup fails. Helps
# avoid conditionals in the template itself.
# https://docs.djangoproject.com/en/1.8/ref/templates/language/#variables
class Defaulter(collections.defaultdict):
  def __init__(self, **kwargs):
    super(Defaulter, self).__init__(Defaulter, **{
      k: (Defaulter(**v) if isinstance(v, dict) else v)
      for k, v in kwargs.items()})

  def __unicode__(self):
    return super(Defaulter, self).__unicode__() if self else u''

  def __hash__(self):
    return super(Defaulter, self).__hash__() if self else None.__hash__()


[docs]def activities_to_atom(activities, actor, title=None, request_url=None, host_url=None, xml_base=None, rels=None, reader=True): """Converts ActivityStreams 1 activities to an Atom feed. Args: activities: list of ActivityStreams activity dicts actor: ActivityStreams actor dict, the author of the feed title: string, the feed <title> element. Defaults to 'User feed for [NAME]' request_url: the URL of this Atom feed, if any. Used in a link rel="self". host_url: the home URL for this Atom feed, if any. Used in the top-level feed <id> element. xml_base: the base URL, if any. Used in the top-level xml:base attribute. rels: rel links to include. dict mapping string rel value to string URL. reader: boolean, whether the output will be rendered in a feed reader. Currently just includes location if True, not otherwise. Returns: unicode string with Atom XML """ # Strip query params from URLs so that we don't include access tokens, etc host_url = (_remove_query_params(host_url) if host_url else 'https://github.com/snarfed/granary') if request_url is None: request_url = host_url for a in activities: _prepare_activity(a, reader=reader) if actor is None: actor = {} return jinja_env.get_template(FEED_TEMPLATE).render( items=[Defaulter(**a) for a in activities], host_url=host_url, request_url=request_url, xml_base=xml_base, title=title or 'User feed for ' + source.Source.actor_name(actor), updated=activities[0]['object'].get('published', '') if activities else '', actor=Defaulter(**actor), rels=rels or {}, VERBS_WITH_OBJECT=source.VERBS_WITH_OBJECT, )
[docs]def activity_to_atom(activity, xml_base=None, reader=True): """Converts a single ActivityStreams 1 activity to an Atom entry. Kwargs are passed through to :func:`activities_to_atom`. Args: xml_base: the base URL, if any. Used in the top-level xml:base attribute. reader: boolean, whether the output will be rendered in a feed reader. Currently just includes location if True, not otherwise. Returns: unicode string with Atom XML """ _prepare_activity(activity, reader=reader) return jinja_env.get_template(ENTRY_TEMPLATE).render( activity=Defaulter(**activity), xml_base=xml_base, VERBS_WITH_OBJECT=source.VERBS_WITH_OBJECT, )
[docs]def atom_to_activity(atom): """Converts an Atom entry to an ActivityStreams 1 activity. Args: atom: unicode string, Atom document Returns: dict, ActivityStreams activity """ assert isinstance(atom, unicode) parser = ElementTree.XMLParser(encoding='UTF-8') entry = ElementTree.XML(atom.encode('utf-8'), parser=parser) if entry.tag.split('}')[-1] != 'entry': raise ValueError('Expected root entry tag; got %s' % entry.tag) # default object data from entry. override with data inside activity:object. obj_elem = entry.find('activity:object', NAMESPACES) obj = _atom_to_object(obj_elem if obj_elem is not None else entry) content = entry.find('atom:content', NAMESPACES) if content is not None: # TODO: use 'html' instead of 'text' to include HTML tags. the problem is, # if there's an embedded XML namespace, it prefixes *every* tag with that # namespace. breaks on e.g. the <div xmlns="http://www.w3.org/1999/xhtml"> # that our Atom templates wrap HTML content in. text = ElementTree.tostring(content, 'utf-8', 'text').decode('utf-8') obj['content'] = re.sub(r'\s+', ' ', text.strip()) point = _text(entry, 'georss:point') if point: lat, long = point.split(' ') obj['location'].update({ 'latitude': float(lat), 'longitude': float(long), }) a = { 'objectType': 'activity', 'verb': _as1_value(entry, 'verb'), 'id': _text(entry, 'id') or (obj['id'] if obj_elem is None else None), 'url': _text(entry, 'uri') or (obj['url'] if obj_elem is None else None), 'object': obj, 'actor': _author_to_actor(entry), 'inReplyTo': obj.get('inReplyTo'), } return source.Source.postprocess_activity(a)
def _atom_to_object(elem): """Converts an Atom entry to an ActivityStreams 1 object. Args: elem: ElementTree.Element Returns: dict, ActivityStreams object """ uri = _text(elem, 'uri') or _text(elem) return { 'objectType': _as1_value(elem, 'object-type'), 'id': _text(elem, 'id') or uri, 'url': uri, 'title': _text(elem, 'title'), 'published': _text(elem, 'published'), 'updated': _text(elem, 'updated'), 'inReplyTo': [{ 'id': r.attrib.get('ref') or _text(r), 'url': r.attrib.get('href') or _text(r), } for r in elem.findall('thr:in-reply-to', NAMESPACES)], 'location': { 'displayName': _text(elem, 'georss:featureName'), } } def _author_to_actor(elem): """Converts an Atom <author> element to an ActivityStreams 1 actor. Looks for <author> *inside* elem. Args: elem: ElementTree.Element Returns: dict, ActivityStreams actor object """ author = elem.find('atom:author', NAMESPACES) if author is not None: return { 'objectType': _as1_value(author, 'object-type'), 'id': _text(author, 'id'), 'url': _text(author, 'uri'), 'displayName': _text(author, 'name'), 'email': _text(author, 'email'), }
[docs]def html_to_atom(html, url=None, fetch_author=False, reader=True): """Converts microformats2 HTML to an Atom feed. Args: html: string url: string URL html came from, optional fetch_author: boolean, whether to make HTTP request to fetch rel-author link reader: boolean, whether the output will be rendered in a feed reader. Currently just includes location if True, not otherwise. Returns: unicode string with Atom XML """ if fetch_author: assert url, 'fetch_author=True requires url!' parsed = mf2py.parse(doc=html, url=url) actor = microformats2.find_author( parsed, fetch_mf2_func=lambda url: mf2py.parse(url=url)) return activities_to_atom( microformats2.html_to_activities(html, url, actor), actor, title=mf2util.interpret_feed(parsed, url).get('name'), xml_base=util.base_url(url), host_url=url, reader=reader)
def _prepare_activity(a, reader=True): """Preprocesses an activity to prepare it to be rendered as Atom. Modifies a in place. Args: a: ActivityStreams 1 activity dict reader: boolean, whether the output will be rendered in a feed reader. Currently just includes location if True, not otherwise. """ act_type = source.object_type(a) if not act_type or act_type == 'post': primary = a.get('object', {}) else: primary = a obj = a.setdefault('object', {}) # Render content as HTML; escape &s obj['rendered_content'] = _encode_ampersands(microformats2.render_content( primary, include_location=reader)) # Make sure every activity has the title field, since Atom <entry> requires # the title element. if not a.get('title'): a['title'] = util.ellipsize(_encode_ampersands( a.get('displayName') or a.get('content') or obj.get('title') or obj.get('displayName') or obj.get('content') or 'Untitled')) # strip HTML tags. the Atom spec says title is plain text: # http://atomenabled.org/developers/syndication/#requiredEntryElements a['title'] = xml.sax.saxutils.escape(BeautifulSoup(a['title']).get_text('')) children = [] image_urls = set() # render attached notes/articles attachments = a.get('attachments') or obj.get('attachments') or [] for att in attachments: image_urls |= set(img.get('url') for img in util.get_list(att, 'image')) if att.get('objectType') in ('note', 'article'): html = microformats2.render_content(att, include_location=reader) author = att.get('author') if author: name = microformats2.maybe_linked_name( microformats2.object_to_json(author).get('properties', [])) html = '%s: %s' % (name.strip(), html) children.append(html) # render image(s) that we haven't already seen content = obj.get('content', '') for image in util.get_list(obj, 'image'): if not image: continue url = image.get('url') parsed = urlparse.urlparse(url) scheme = parsed.scheme netloc = parsed.netloc rest = urlparse.urlunparse(('', '') + parsed[2:]) img_src_re = re.compile(r"""src *= *['"] *((%s)?//%s)?%s *['"]""" % (scheme, re.escape(netloc), re.escape(rest))) if (url and url not in image_urls and not img_src_re.search(content)): children.append(microformats2.img(image['url'], 'u-photo')) obj['rendered_children'] = [_encode_ampersands(html) for html in children] def _remove_query_params(url): parsed = list(urlparse.urlparse(url)) parsed[4] = '' return urlparse.urlunparse(parsed)