Source code for granary.rss

"""Convert between ActivityStreams and RSS 2.0.

RSS 2.0 spec: http://www.rssboard.org/rss-specification

Feedgen docs: https://feedgen.kiesow.be/

Apple iTunes Podcasts feed requirements:
https://help.apple.com/itc/podcasts_connect/#/itc1723472cb

Notably:

* Valid RSS 2.0.
* Each podcast item requires ``<guid>``.
* Images should be JPEG or PNG, 1400x1400 to 3000x3000.
* HTTP server that hosts assets and files should support range requests.
"""
from datetime import datetime, time, timezone
import logging
from itertools import zip_longest
import mimetypes

import dateutil.parser
from feedgen.feed import FeedGenerator
import feedparser
import mf2util
from oauth_dropins.webutil import util

from . import as1, microformats2
from .source import Source

logger = logging.getLogger(__name__)

CONTENT_TYPE = 'application/rss+xml; charset=utf-8'
# allowed ActivityStreams objectTypes for media enclosures
ENCLOSURE_TYPES = {'audio', 'video'}


[docs] def from_activities(activities, actor=None, title=None, feed_url=None, home_page_url=None, hfeed=None): """Converts ActivityStreams activities to an RSS 2.0 feed. Args: activities (sequence): of ActivityStreams activity dicts actor (dict): ActivityStreams actor, author of the feed title (str): the feed title feed_url (str): the URL for this RSS feed home_page_url (str): the home page URL hfeed (dict): parsed mf2 ``h-feed``, if available Returns: str: RSS 2.0 XML """ try: iter(activities) except TypeError: raise TypeError('activities must be iterable') if isinstance(activities, (dict, str)): raise TypeError('activities may not be a dict or string') fg = FeedGenerator() fg.id(feed_url) assert feed_url fg.link(href=feed_url, rel='self') if home_page_url: fg.link(href=home_page_url, rel='alternate') # TODO: parse language from lang attribute: # https://github.com/microformats/mf2py/issues/150 fg.language('en') fg.generator('granary', uri='https://granary.io/') hfeed = hfeed or {} actor = actor or {} image = (util.get_url(hfeed.get('properties', {}), 'photo') or util.get_url(actor, 'image')) if image: fg.image(image) props = hfeed.get('properties') or {} content = microformats2.get_text(util.get_first(props, 'content', '')) summary = util.get_first(props, 'summary', '') desc = content or summary or '-' fg.description(desc) # required fg.title(title or util.ellipsize(desc)) # required latest = None feed_has_enclosure = False for activity in activities: obj = activity if activity.get('verb') in ('create', 'post'): obj = as1.get_object(activity) if activity.get('objectType') == 'person': continue item = fg.add_entry(order='append') url = obj.get('url') id = obj.get('id') or url item.id(id) item.link(href=url) item.guid(url, permalink=True) # title (required) title = obj.get('title') or obj.get('displayName') if title: # strip HTML tags item.title(util.parse_html(title).get_text('').strip()) content = microformats2.render_content( obj, include_location=True, render_attachments=True, render_image=True) if not content: content = obj.get('summary') or '' item.content(content, type='CDATA') categories = [ {'term': t['displayName']} for t in obj.get('tags', []) if t.get('displayName') and t.get('verb') not in ('like', 'react', 'share') and t.get('objectType') not in ('article', 'person', 'mention')] item.category(categories) author = as1.get_object(obj, 'author') author = { 'name': (author.get('displayName') or author.get('username') or author.get('url') or author.get('id')), 'uri': author.get('url') or author.get('id'), 'email': author.get('email') or '-', } item.author(author) published = obj.get('published') or obj.get('updated') if published and isinstance(published, str): try: dt = mf2util.parse_datetime(published) if not isinstance(dt, datetime): dt = datetime.combine(dt, time.min) if not dt.tzinfo: dt = dt.replace(tzinfo=timezone.utc) item.published(dt) if not latest or dt > latest: latest = dt except ValueError: # bad datetime string pass item_has_enclosure = False for att in obj.get('attachments', []): stream = util.get_first(att, 'stream') or att if not stream: continue url = stream.get('url') or '' mime = mimetypes.guess_type(url)[0] or '' if (att.get('objectType') in ENCLOSURE_TYPES or mime and mime.split('/')[0] in ENCLOSURE_TYPES): if item_has_enclosure: logger.info(f'Warning: item {id} already has an RSS enclosure, skipping additional enclosure {url}') continue item_has_enclosure = feed_has_enclosure = True item.enclosure(url=url, type=mime, length=str(stream.get('size', ''))) item.load_extension('podcast') duration = stream.get('duration') if duration: item.podcast.itunes_duration(duration) if feed_has_enclosure: fg.load_extension('podcast') fg.podcast.itunes_author(actor.get('displayName') or actor.get('username')) if summary: fg.podcast.itunes_summary(summary) fg.podcast.itunes_explicit('no') fg.podcast.itunes_block(False) name = author.get('name') if name: fg.podcast.itunes_author(name) if image: fg.podcast.itunes_image(image) if latest: fg.lastBuildDate(latest) return fg.rss_str(pretty=True).decode('utf-8')
[docs] def to_activities(rss): """Converts an RSS feed to ActivityStreams 1 activities. Args: rss (str): RSS document with top-level ``<rss>`` element Returns: list of dict: ActivityStreams activity """ parsed = feedparser.parse(rss) activities = [] feed = parsed.get('feed', {}) actor = { 'displayName': feed.get('title'), 'url': feed.get('link'), 'summary': feed.get('info') or feed.get('description'), 'image': [{'url': feed.get('image', {}).get('href') or feed.get('logo')}], } def iso_datetime(field): # check for existence because feedparser returns 'published' for 'updated' # when you [] or .get() it if field in entry: try: return dateutil.parser.parse(entry[field]).isoformat() except (TypeError, dateutil.parser.ParserError): return None def as_int(val): return int(val) if util.is_int(val) else val for entry in parsed.get('entries', []): id = entry.get('id') uri = entry.get('uri') or entry.get('link') attachments = [] for e in entry.get('enclosures', []): url = e.get('href') if url: mime = e.get('type') or mimetypes.guess_type(url)[0] or '' type = mime.split('/')[0] attachments.append({ 'stream': { 'url': url, 'size': as_int(e.get('length')), 'duration': as_int(entry.get('itunes_duration')), }, 'objectType': type if type in ENCLOSURE_TYPES else None, }) detail = entry.get('author_detail', {}) author = util.trim_nulls({ 'displayName': detail.get('name') or entry.get('author'), 'url': detail.get('href'), 'email': detail.get('email'), }) if not author: author = actor object_type = 'note' content = (entry.get('summary') or entry.get('content', [{}])[0].get('value') or entry.get('description')) title = entry.get('title') if content and title: if content.startswith(title.removesuffix('…').removesuffix('...')): title = None else: object_type = 'article' images = [] for media, alt in zip_longest(util.get_list(entry, 'media_content'), util.get_list(entry, 'content'), fillvalue={}): if url := media.get('url'): filesize = media.get('filesize') images.append({ 'url': url, 'mimeType': media.get('type'), 'length': int(filesize) if util.is_int(filesize) else None, 'displayName': alt.get('value'), }) activities.append(Source.postprocess_activity({ 'objectType': 'activity', 'verb': 'post', 'id': id, 'url': uri, 'actor': author, 'object': { 'objectType': object_type, 'id': id or uri, 'url': uri, 'displayName': title, 'content': content, 'published': iso_datetime('published'), 'updated': iso_datetime('updated'), 'author': author, 'image': images, 'tags': [{'displayName': tag.get('term') for tag in entry.get('tags', [])}], 'attachments': attachments, 'stream': [a['stream'] for a in attachments], }, }, mentions=True)) return util.trim_nulls(activities)