You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
180 lines
7.5 KiB
Python
180 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
from enum import Enum
|
|
from getpass import getpass
|
|
from html.parser import HTMLParser
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from typing import Sequence
|
|
|
|
import xdg.BaseDirectory
|
|
|
|
from tinytinypy import Connection, UpdateMode, UpdateField
|
|
|
|
APP_NAME = "tinytinypy" # For default config directories
|
|
|
|
|
|
UPDATE_MODES = {
|
|
"false": UpdateMode.SET_TO_FALSE,
|
|
"true": UpdateMode.SET_TO_TRUE,
|
|
"toggle": UpdateMode.TOGGLE,
|
|
}
|
|
|
|
UPDATE_FIELDS = {
|
|
"published": UpdateField.PUBLISHED,
|
|
"starred": UpdateField.STARRED,
|
|
"unread": UpdateField.UNREAD,
|
|
}
|
|
|
|
def comma_int_list(text: str) -> Sequence[int]:
|
|
return [int(i) for i in text.split(",")]
|
|
|
|
|
|
class OutputMode(Enum):
|
|
# enum values
|
|
JSON = ('json', False)
|
|
TTS_READY = ('tts-ready', True)
|
|
ONLY_URL = ('only-url', False)
|
|
HEADLINES = ('headlines', False)
|
|
# helpers
|
|
@classmethod
|
|
def get_mode_names(cls):
|
|
return [e.mode_name for e in cls]
|
|
@classmethod
|
|
def parse_mode(cls, mode_name):
|
|
for e in cls:
|
|
if e.mode_name == mode_name:
|
|
return e
|
|
def __init__(self, mode_name, requires_content):
|
|
self.mode_name = mode_name
|
|
self.requires_content = requires_content
|
|
def __str__(self):
|
|
return self.mode_name
|
|
|
|
class ContentTTSParser(HTMLParser):
|
|
END_SENTENCE_CHARS = [".", ";", ":", "!", "?"]
|
|
ONLY_SEPARATION_CHARS = [","]
|
|
PUNCTATION_CHARS = END_SENTENCE_CHARS + ONLY_SEPARATION_CHARS
|
|
SENTENCE_TAG = ["p", "div"]
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.extracted = ""
|
|
def append(self, text):
|
|
if self.extracted:
|
|
self.extracted += " "
|
|
self.extracted += ' '.join(text.split())
|
|
def close_sentence(self):
|
|
if self.extracted:
|
|
last_char = self.extracted[-1]
|
|
if last_char not in self.END_SENTENCE_CHARS:
|
|
if last_char in self.ONLY_SEPARATION_CHARS:
|
|
self.extracted = self.extracted[:-1] + self.END_SENTENCE_CHARS[0]
|
|
else:
|
|
self.extracted += self.END_SENTENCE_CHARS[0]
|
|
def append_sentence(self, text):
|
|
self.close_sentence()
|
|
self.append(text)
|
|
self.close_sentence()
|
|
def handle_starttag(self, tag, attrs):
|
|
if tag in self.SENTENCE_TAG:
|
|
self.close_sentence()
|
|
def handle_endtag(self, tag):
|
|
if tag in self.SENTENCE_TAG:
|
|
self.close_sentence()
|
|
def handle_data(self, data):
|
|
self.append(data)
|
|
|
|
def func_articles(server: Connection, args: argparse.Namespace):
|
|
filtered = {key: value for key, value in args.__dict__.items() if key in ["feed_id", "cat_id", "limit", "skip", "view_mode", "since_id", "include_nested", "order_by"]}
|
|
headlines = server.getHeadlines(show_content=args.output_mode.requires_content, **filtered)
|
|
if args.output_mode == OutputMode.ONLY_URL:
|
|
for h in headlines:
|
|
print(h.url)
|
|
elif args.output_mode == OutputMode.TTS_READY:
|
|
for h in headlines:
|
|
if h.content:
|
|
parser = ContentTTSParser()
|
|
parser.append_sentence(h.title)
|
|
parser.feed(h.content)
|
|
print(parser.extracted)
|
|
else:
|
|
print(h.title)
|
|
elif args.output_mode == OutputMode.HEADLINES:
|
|
for h in headlines:
|
|
print(h.title)
|
|
elif args.output_mode == OutputMode.JSON:
|
|
print(json.dumps([h.toJson() for h in headlines]))
|
|
else:
|
|
raise Exception(f'Not implemented output mode "{args.output_mode}"')
|
|
|
|
def func_update(server: Connection, args: argparse.Namespace):
|
|
# argpase should check that mode and field are valid
|
|
updated = server.updateArticle(
|
|
article_ids=args.article_ids,
|
|
mode=UPDATE_MODES[args.mode],
|
|
field=UPDATE_FIELDS[args.field],
|
|
)
|
|
print(f"Updated {updated} article(s)")
|
|
|
|
def configure_parser():
|
|
parser = argparse.ArgumentParser(description="Allows access to feeds and articles of a Tiny Tiny RSS instance using the API.")
|
|
#parser.add_argument('--proto', '--protocol', dest='proto', default='https', choices=['http', 'https'], help="The protocol used to access the api, defaults to https")
|
|
parser.add_argument('-H', '--host', '--url', dest='url', required=True, help="URL of the TT-RSS instace to access, examples: rss.example.com, example.com/tt-rss")
|
|
parser.add_argument('-u', '--user', '--username', dest='user', required=True, help="Name of the user used to access the instance")
|
|
parser.add_argument('-p', '--pass', '--password', dest='passwd', help="Password of the user used to access the instance")
|
|
parser.add_argument('-P', '--pass-stdin', '--password-stdin', action='store_true', dest='passStdin', help="Read the password for the user used to access the instance from stdin")
|
|
sub = parser.add_subparsers(help="Operations", dest='op', description="Operations available to send to server")
|
|
#= Get Articles
|
|
p = sub.add_parser('articles', help="Get articles")
|
|
p.set_defaults(func=func_articles)
|
|
p.add_argument('--feed-id', dest='feed_id', help="Only output articles for this feed, cannot be used with --cat-id")
|
|
p.add_argument('--cat-id', dest='cat_id', help="Only output articles for feeds of this category, cannot be used with --feed-id")
|
|
p.add_argument('--limit', dest='limit', type=int, help="Maximum count of articles")
|
|
p.add_argument('--skip', '--offset', dest='skip', type=int, help="Skip this amount of feeds first")
|
|
p.add_argument('--view-mode', '--filter', dest='view_mode', choices=['all_articles', 'unread', 'adaptive', 'marked', 'updated'], default='all_articles', help='Only show articles of certain type')
|
|
p.add_argument('--order', '--order-by', dest='order_by', choices=['feed_dates', 'date_reverse'], default='date_reverse')
|
|
p.add_argument('--output-mode', dest='output_mode', choices=OutputMode, type=OutputMode.parse_mode, default='json', help='Define how the received articles should be outputed, in most modes except json and *-full modes, one line equals a single article')
|
|
#= Update Articles
|
|
p = sub.add_parser("update", help="Update articles")
|
|
p.set_defaults(func=func_update)
|
|
p.add_argument("-i", "--ids", "--article-ids", dest="article_ids", type=comma_int_list, help="Comma separated list of article ids to update")
|
|
p.add_argument("-m", "--mode", dest="mode", choices=UPDATE_MODES.keys())
|
|
p.add_argument("-f", "--field", dest="field", choices=UPDATE_FIELDS.keys())
|
|
return parser
|
|
|
|
def parse(args):
|
|
return configure_parser().parse_args(args=args)
|
|
|
|
def main():
|
|
URL_REGEX = re.compile(r'^(?P<proto>[a-z]+://)?(?P<host>[^:/ ]+)(:(?P<port>\d+))?(?P<path>.*)$')
|
|
# Retrieve args from config and shell
|
|
storedArgs = sys.argv.copy()
|
|
storedArgs.pop(0)
|
|
for d in xdg.BaseDirectory.load_config_paths(APP_NAME):
|
|
with open(os.path.join(d, "config.txt")) as f:
|
|
storedArgs = [line.strip() for line in f if line.strip()[0] != '#'] + storedArgs
|
|
# Parse args
|
|
args = parse(storedArgs)
|
|
# Read Pass if stdin
|
|
passwd = args.passwd
|
|
if args.passStdin:
|
|
passwd = getpass()
|
|
# Check for pass
|
|
if passwd is None:
|
|
raise ValueError("Password is missing!") # TODO make more beautiful for user
|
|
# Parse url
|
|
urlM = URL_REGEX.match(args.url).groupdict()
|
|
proto = urlM.get('proto') or 'https'
|
|
host = urlM['host']
|
|
# Call api
|
|
# TODO Support port
|
|
with Connection(proto=proto, host=host) as server:
|
|
server.login(args.user, passwd)
|
|
args.func(server, args)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|