rss_notifier/rss_notifier.py

264 lines
9 KiB
Python
Raw Normal View History

2023-07-03 10:35:32 -04:00
from requests import get
from sqlite3 import Connection as SQL, Cursor
from contextlib import contextmanager
from bs4 import BeautifulSoup as Soup
from sys import argv, stderr
from dataclasses import dataclass
from typing import *
from subprocess import Popen, PIPE, run
from os import execvp, environ
from json import dumps, loads
@dataclass
class Feed:
id: int
url: str
content: Optional[Soup] = None
def fetch(self):
res = get(self.url)
res.raise_for_status()
self.content = Soup(res.text, features="xml")
return self
def entries(self):
if content := self.content:
return content.find_all('entry')
else:
return self.fetch().entries()
@classmethod
def from_record(cls, record):
return cls(*record)
def __eq__(self, other):
return self.id == other.id
def __ne__(self, other):
return self.id != other.id
def __hash__(self):
return hash(self.id)
def to_dict(self):
return {'id': self.id, 'url': self.url}
def to_json(self) -> str:
return dumps(self.to_dict())
@classmethod
def from_json(cls, text: str) -> 'Feed':
data = loads(text)
return cls(id=data['id'], url=data['url'])
ACTIONS = ['--action', 'open=Open link', '--action', 'read=Mark read']
@dataclass
class FeedEntry:
id: int
feed: Feed
upstream_id: str
title: str
link: str
read: bool
@classmethod
def select_all(cls, db_connection) -> List['FeedEntry']:
feed_entries = db_connection.cursor().execute("select entry.*, feed.url from entries entry join feeds feed on feed.id = entry.feed_id;").fetchall()
return set(
FeedEntry(id, Feed(feed_id, feed_url), upstream_id, title, link, read)
for id, feed_id, upstream_id, title, link, read, feed_url,
in feed_entries
)
def __eq__(self, other):
return self.upstream_id == other.upstream_id
def __ne__(self, other):
return self.upstream_id != other.upstream_id
def __hash__(self):
return hash(self.upstream_id)
def to_json(self) -> str:
assert list(self.__dict__.keys()) == "id feed upstream_id title link read".split()
return dumps({ k: v.to_dict() if k == 'feed' else v for k, v in self.__dict__.items() })
@classmethod
def from_json(cls, data) -> 'FeedEntry':
assert list(cls.__annotations__.keys()) == "id feed upstream_id title link read".split(), repr(list(cls.__annotations__.keys()))
entry = loads(data)
return cls(**{k: Feed(**entry[k]) if k == 'feed' else entry[k] for k in cls.__annotations__.keys()})
def mark_read(self, db_connection):
db_connection.cursor().execute('update entries set read = true where id = ?', (self.id,))
db_connection.commit()
self.read = True
@classmethod
def from_rss(cls, soup: Soup, feed: Feed, mark_read=False) -> 'FeedEntry':
upstream_id = soup.find('id').text
if el := soup.find('link'):
link = el['href']
else:
raise ValueError(f"no link tied to RSS feed {feed.url!r} entry {upstream_id}")
title = soup.find('title').text
return cls(id=None, title=title, feed=feed, upstream_id=upstream_id, link=link, read=mark_read)
def insert(self, db_connection: Cursor):
db_connection.execute('''
insert into entries (
feed_id, upstream_id, title, link, read
) values (
?, ?, ?, ?, ?
);
''', (
self.feed.id,
self.upstream_id,
self.title,
self.link,
self.mark_read
)
)
class RSSNotifier:
def __init__(self, db_loc):
self.db_loc = db_loc
self.db_connection = SQL(db_loc)
self.create_tables()
self.processes = []
def create_tables(self):
db = self.db_connection.cursor()
db.execute(
'create table if not exists feeds (id integer primary key autoincrement, url text)'
)
db.execute('''
create table if not exists entries (
id integer primary key autoincrement,
feed_id int references feeds(id),
upstream_id text unique,
title text,
link text,
read boolean
);
''')
def select_feeds(self):
"""Return all the feed urls mapped to their current content"""
db = self.db_connection.cursor()
feeds = db.execute("select * from feeds;").fetchall()
return map(Feed, feeds)
def add_feed(self, url: str, mark_read: bool):
feed = Feed(-1, url).fetch()
cursor = self.db_connection.cursor()
cursor.execute("insert into feeds (url) values (?)", (url,))
feed.id = cursor.lastrowid
for entry in feed.entries():
FeedEntry.from_rss(entry, feed, mark_read).insert(cursor)
self.db_connection.commit()
def parse_args(self, args=None):
mark_read = True
if args is None:
args = argv
feeds_to_add = []
try:
while True:
match arg := argv.pop(0):
case "--no-mark-read":
mark_read = False
case "--add-feed":
try:
feed = argv.pop(0)
except IndexError:
print("must specify a feed to add", file=stderr)
exit(1)
feeds_to_add.append(feed)
case '--make-notification':
entry = FeedEntry.from_json(argv.pop(0))
self._launch_notification(entry)
return
case other:
print(f'unrecognized argument {other!r}', file=stderr)
except IndexError:
# done parsing the args
...
if feeds_to_add:
for feed in feeds_to_add:
self.add_feed(feed, mark_read)
exit(0)
self.check_for_new()
def check_for_new(self):
entries = FeedEntry.select_all(self.db_connection)
feeds: Set[Feed] = set(entry.feed for entry in entries)
for feed in feeds:
feeds_markup = set(feed.entries())
for markup in feeds_markup:
entry = FeedEntry.from_rss(markup, feed)
if fe := next(fe for fe in entries if fe == entry):
if not fe.read:
self.notify(entry)
else:
entry.insert(self.db_connection.cursor())
self.db_connection.commit()
self.notify(entry)
exit_code = 0
while self.processes:
for proc in self.processes:
match proc.poll():
case None:
continue
case 0:
...
case nonzero:
print(f"proc {proc.pid} returned nonzero exit code {nonzero}", file=stderr)
if stream := proc.stdout:
print(f'stdout: {stream.read()}', file=stderr)
if stream := proc.stderr:
print(f'stdout: {stream.read()}', file=stderr)
exit_code = nonzero
self.processes = [p for p in self.processes if proc.pid != p.pid]
exit(exit_code)
def notify(self, entry: FeedEntry):
self.processes.append(
Popen(executable="python", args=["python", __file__, '--make-notification', entry.to_json()])
)
def _launch_notification(self, entry: FeedEntry):
result = run(
executable='notify-send',
args=[
'notify-send',
'--expire-time', '0',
'--app-name', 'RSS Notifier',
*ACTIONS,
f'New RSS Story: {entry.title}'
],
stdout=PIPE,
stderr=PIPE
)
result.check_returncode()
match result.stdout:
case b'open\n':
entry.mark_read(self.db_connection)
print(f"opening {entry.link}")
execvp('xdg-open', ['xdg-open', entry.link])
case b'read\n':
entry.mark_read(self.db_connection)
case b'':
print(f'no response on stdout from notify-send. stderr read: {result.stderr!r}')
case other:
print(f'unrecognized response from notify-send: {other!r}\nstderr read: {result.stderr!r}', file=stderr)
if __name__ == "__main__":
RSSNotifier(environ.get("RSS_NOTIFIER_DATABASE_LOCATION") or Path(environ.get("HOME")) / ".local/state/rss-notifier/db.sqlite3").parse_args()