292 lines
9.9 KiB
Python
Executable file
292 lines
9.9 KiB
Python
Executable file
#!/usr/bin/env nix-shell
|
|
#! nix-shell -i python -p python3 libnotify python3Packages.beautifulsoup4 python3Packages.lxml python3Packages.requests
|
|
from requests import get
|
|
from sqlite3 import Connection as SQL, Cursor
|
|
from contextlib import contextmanager
|
|
from bs4 import BeautifulSoup as Soup
|
|
from sys import argv, stderr
|
|
from dataclasses import dataclass
|
|
from typing import *
|
|
from subprocess import Popen, PIPE, run
|
|
from os import execvp, environ, makedirs
|
|
from json import dumps, loads
|
|
from pathlib import Path
|
|
|
|
|
|
@dataclass
|
|
class Feed:
|
|
id: int
|
|
url: str
|
|
title: Optional[str] = None
|
|
content: Optional[Soup] = None
|
|
|
|
def fetch_content(self):
|
|
res = get(self.url)
|
|
res.raise_for_status()
|
|
self.content = Soup(res.text, features="xml")
|
|
if title_tag := self.content.find('title'):
|
|
self.title = title_tag.text
|
|
else:
|
|
print(f'warning: no title for feed {self.url!r}')
|
|
return self
|
|
|
|
def entries(self):
|
|
if content := self.content:
|
|
return content.find_all('entry')
|
|
else:
|
|
return self.fetch_content().entries()
|
|
|
|
@classmethod
|
|
def from_record(cls, record):
|
|
return cls(*record)
|
|
|
|
def __eq__(self, other):
|
|
return self.id == other.id
|
|
|
|
def __ne__(self, other):
|
|
return self.id != other.id
|
|
|
|
def __hash__(self):
|
|
return hash(self.id)
|
|
|
|
def to_dict(self):
|
|
return {'id': self.id, 'url': self.url, 'title': self.title}
|
|
|
|
def to_json(self) -> str:
|
|
return dumps(self.to_dict())
|
|
|
|
@classmethod
|
|
def from_json(cls, text: str) -> 'Feed':
|
|
data = loads(text)
|
|
return cls(id=data['id'], url=data['url'], title=data['title'])
|
|
|
|
ACTIONS = ['--action', 'open=Open link', '--action', 'read=Mark read']
|
|
|
|
@dataclass
|
|
class FeedEntry:
|
|
id: int
|
|
feed: Feed
|
|
upstream_id: str
|
|
title: str
|
|
link: str
|
|
read: bool
|
|
|
|
@classmethod
|
|
def select_all(cls, db_connection) -> List['FeedEntry']:
|
|
query = '''
|
|
select entry.*, feed.url, feed.title
|
|
from entries entry
|
|
join feeds feed
|
|
on feed.id = entry.feed_id;
|
|
'''
|
|
feed_entries = db_connection.cursor().execute(query).fetchall()
|
|
return set(
|
|
FeedEntry(id, Feed(feed_id, feed_url, feed_title), upstream_id, title, link, read)
|
|
for id, feed_id, upstream_id, title, link, read, feed_url, feed_title
|
|
in feed_entries
|
|
)
|
|
|
|
def __eq__(self, other):
|
|
return self.upstream_id == other.upstream_id
|
|
|
|
def __ne__(self, other):
|
|
return self.upstream_id != other.upstream_id
|
|
|
|
def __hash__(self):
|
|
return hash(self.upstream_id)
|
|
|
|
def to_json(self) -> str:
|
|
assert list(self.__dict__.keys()) == "id feed upstream_id title link read".split()
|
|
return dumps({
|
|
k: v.to_dict() if k == 'feed' else v
|
|
for k, v in self.__dict__.items()
|
|
})
|
|
|
|
@classmethod
|
|
def from_json(cls, data) -> 'FeedEntry':
|
|
assert list(cls.__annotations__.keys()) == "id feed upstream_id title link read".split(), repr(list(cls.__annotations__.keys()))
|
|
entry = loads(data)
|
|
return cls(**{k: Feed(**entry[k]) if k == 'feed' else entry[k] for k in cls.__annotations__.keys()})
|
|
|
|
def mark_read(self, db_connection):
|
|
db_connection.cursor().execute('update entries set read = true where id = ?', (self.id,))
|
|
db_connection.commit()
|
|
self.read = True
|
|
|
|
@classmethod
|
|
def from_rss(cls, soup: Soup, feed: Feed, mark_read=False) -> 'FeedEntry':
|
|
upstream_id = soup.find('id').text
|
|
if el := soup.find('link'):
|
|
link = el['href']
|
|
else:
|
|
raise ValueError(f"no link tied to RSS feed {feed.url!r} entry {upstream_id}")
|
|
title = soup.find('title').text
|
|
return cls(id=None, title=title, feed=feed, upstream_id=upstream_id, link=link, read=mark_read)
|
|
|
|
def insert(self, db_connection: Cursor):
|
|
db_connection.execute('''
|
|
insert into entries (
|
|
feed_id, upstream_id, title, link, read
|
|
) values (
|
|
?, ?, ?, ?, ?
|
|
);
|
|
''', (
|
|
self.feed.id,
|
|
self.upstream_id,
|
|
self.title,
|
|
self.link,
|
|
self.read
|
|
)
|
|
)
|
|
|
|
|
|
class RSSNotifier:
|
|
def __init__(self, db_loc):
|
|
self.db_loc = db_loc
|
|
self.db_connection = SQL(db_loc)
|
|
self.create_tables()
|
|
|
|
self.processes = []
|
|
|
|
def create_tables(self):
|
|
db = self.db_connection.cursor()
|
|
db.execute('''
|
|
create table if not exists feeds (
|
|
id integer primary key autoincrement,
|
|
url text unique not null,
|
|
title text
|
|
);
|
|
''')
|
|
db.execute('''
|
|
create table if not exists entries (
|
|
id integer primary key autoincrement,
|
|
feed_id int references feeds(id),
|
|
upstream_id text unique,
|
|
title text,
|
|
link text,
|
|
read boolean
|
|
);
|
|
''')
|
|
|
|
def select_feeds(self):
|
|
"""Return all the feed urls mapped to their current content"""
|
|
db = self.db_connection.cursor()
|
|
feeds = db.execute("select * from feeds;").fetchall()
|
|
return map(Feed, feeds)
|
|
|
|
def add_feed(self, url: str, mark_read: bool):
|
|
feed = Feed(-1, url).fetch_content()
|
|
cursor = self.db_connection.cursor()
|
|
cursor.execute("insert into feeds (url, title) values (?, ?)", (url, feed.title))
|
|
feed.id = cursor.lastrowid
|
|
for entry in feed.entries():
|
|
FeedEntry.from_rss(entry, feed, mark_read).insert(cursor)
|
|
|
|
self.db_connection.commit()
|
|
|
|
def parse_args(self, args=None):
|
|
mark_read = True
|
|
if args is None:
|
|
args = argv[1:]
|
|
feeds_to_add = []
|
|
try:
|
|
while True:
|
|
match arg := argv.pop(0):
|
|
case "--no-mark-read":
|
|
mark_read = False
|
|
case "--add-feed":
|
|
try:
|
|
feed = argv.pop(0)
|
|
except IndexError:
|
|
print("must specify a feed to add", file=stderr)
|
|
exit(1)
|
|
feeds_to_add.append(feed)
|
|
case '--make-notification':
|
|
entry = FeedEntry.from_json(argv.pop(0))
|
|
self._launch_notification(entry)
|
|
return
|
|
case other:
|
|
print(f'unrecognized argument {other!r}', file=stderr)
|
|
except IndexError:
|
|
# done parsing the args
|
|
...
|
|
if feeds_to_add:
|
|
for feed in feeds_to_add:
|
|
self.add_feed(feed, mark_read)
|
|
exit(0)
|
|
self.check_for_new()
|
|
|
|
def check_for_new(self):
|
|
entries = FeedEntry.select_all(self.db_connection)
|
|
feeds: Set[Feed] = set(entry.feed for entry in entries)
|
|
for feed in feeds:
|
|
feeds_markup = set(feed.entries())
|
|
for markup in feeds_markup:
|
|
entry = FeedEntry.from_rss(markup, feed)
|
|
if fe := next(fe for fe in entries if fe == entry):
|
|
if not fe.read:
|
|
self.notify(entry)
|
|
else:
|
|
entry.insert(self.db_connection.cursor())
|
|
self.db_connection.commit()
|
|
self.notify(entry)
|
|
exit_code = 0
|
|
while self.processes:
|
|
for proc in self.processes:
|
|
match proc.poll():
|
|
case None:
|
|
continue
|
|
case 0:
|
|
...
|
|
case nonzero:
|
|
print(f"proc {proc.pid} returned nonzero exit code {nonzero}", file=stderr)
|
|
if stream := proc.stdout:
|
|
print(f'stdout: {stream.read()}', file=stderr)
|
|
if stream := proc.stderr:
|
|
print(f'stdout: {stream.read()}', file=stderr)
|
|
exit_code = nonzero
|
|
self.processes = [p for p in self.processes if proc.pid != p.pid]
|
|
exit(exit_code)
|
|
|
|
|
|
def notify(self, entry: FeedEntry):
|
|
self.processes.append(
|
|
Popen(executable="python", args=["python", __file__, '--make-notification', entry.to_json()])
|
|
)
|
|
|
|
def _launch_notification(self, entry: FeedEntry):
|
|
result = run(
|
|
executable='notify-send',
|
|
args=[
|
|
'notify-send',
|
|
'--expire-time', '0',
|
|
'--app-name', 'RSS Notifier',
|
|
*ACTIONS,
|
|
f'New RSS Story from {entry.feed.title}: {entry.title}'
|
|
],
|
|
stdout=PIPE,
|
|
stderr=PIPE
|
|
)
|
|
result.check_returncode()
|
|
match result.stdout:
|
|
case b'open\n':
|
|
entry.mark_read(self.db_connection)
|
|
print(f"opening {entry.link}")
|
|
execvp('xdg-open', ['xdg-open', entry.link])
|
|
case b'read\n':
|
|
entry.mark_read(self.db_connection)
|
|
case b'':
|
|
print(f'no response on stdout from notify-send. stderr read: {result.stderr!r}')
|
|
case other:
|
|
print(f'unrecognized response from notify-send: {other!r}\nstderr read: {result.stderr!r}', file=stderr)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
db_path = environ.get("RSS_NOTIFIER_DATABASE_LOCATION")
|
|
if db_path is None:
|
|
db_path = Path(environ.get("HOME")) / ".local/state/rss-notifier/db.sqlite3"
|
|
else:
|
|
db_path = Path(db_path)
|
|
if not db_path.parent.exists():
|
|
makedirs(db_path.parent)
|
|
RSSNotifier(db_path).parse_args() |