#!/usr/bin/env python3 import argparse from dataclasses import dataclass from datetime import datetime, timedelta from functools import cached_property, partial import locale from pathlib import Path import re import readline import shlex import shutil import subprocess import sys import tempfile from typing import Callable, Iterable, List, Mapping warn = partial(print, file=sys.stderr) # === Configuration # name of directory to hold index INDEX_DIR = ".index" # name of directory where new pages will be stored DEFAULT_CATEGORY = "_toSort" # languages to apply OCR in OCR_LANGS = [ "deu", "eng", ] # Declare which scan source to use by default USE_ADF_BY_DEFAULT = True # Scan sources to use, depend on scanner, find out with "scanimge -L" ADF_SCAN_SOURCE = "ADF Duplex" FLATBED_SCAN_SOURCE = "Flatbed" # Minimum width / length of IDs in index direcory MIN_NUM_WIDTH = 6 # only used for INDEX_DIR files # How many pages are displayed as context ID_AROUND_RANGE = 10 # The PDF Viewer you want to open (as list of arguments) PDF_VIEWER_ARGS = ["zathura", "--mode=fullscreen"] # === Patterns SCAN_SUFFIXES = [ # Regexes "jpe?g", "pdf", "png", ] ID_REGEX = re.compile(r"""^ ( (?Pd(igital)?) # no physical original | ( (?P\d+) # simple id | (?P\d+)\+ # id and following id | (?P\d+)\+\+ # id and following 3 ids (this and following document with each 2 sides) | (?P\d+)-(?P\d+) # id range )(?P\#)? ) $""", re.VERBOSE) SCAN_REGEX = re.compile(r"""^ ( # Date (?P\d{4}-\d{2}-\d{2})_ )? # automatic prefix of scanimage (out)? # scan id (?P """ + ID_REGEX.pattern[1:-1] + r""" ) ( # Description (optional) _(?P.*) )? # Suffix \.(""" + "|".join(SCAN_SUFFIXES) + r""") $""", re.VERBOSE) SCAN_WARN_REGEX = re.compile(r"\.(" + "|".join(SCAN_SUFFIXES) + r")$") NUMBER_REGEX = re.compile(r"^\d+$") CONTENT_SPLIT_REGEX = re.compile(r"[\W]") DATE_REGEX = re.compile(r"(\d{2,4}-\d{1,2}-\d{1,2}|\d{1,2}\.\d{1,2}\.\d{2,4}|\d{1,2}\.\s+[a-zA-Z]+\s+\d{2,4})") DATE_FORMATS = [ # date.strptime compatible "%Y-%m-%d", "%y-%m-%d", "%d.%m.%Y", "%d.%m.%y", "%d. %B %Y", "%d. %B %y", "%d. %b %Y", "%d. %b %y", ] # === Code def build_args(args: Iterable) -> str: return " ".join((shlex.quote(str(e)) for e in args)) def build_ocr_args(in_file: str, out_file: str, ocr_langs: Iterable[str] = OCR_LANGS, additional_args: Iterable = []) -> str: return build_args([ "ocrmypdf", "--skip-text", "--pdfa-image-compression", "jpeg", # usable as only applied once "--jpeg-quality", "100", # ensure highest quality "-l", "+".join(ocr_langs), *additional_args, in_file, out_file, ]) def rlinput(prompt, prefill=None, suggestions=[]): if suggestions and prefill is None: prefill = suggestions.pop(0) readline.clear_history() for sug in reversed(suggestions): readline.add_history(sug) readline.set_startup_hook(lambda: readline.insert_text(prefill or "")) try: return input(prompt) finally: readline.set_startup_hook() @dataclass(eq=True, order=True, frozen=True) class IdRange: first: int last: int @classmethod def from_match(cls, m: re.Match): if not m: return None r = None if m.group("digital"): r = (-1, -1) elif m.group("id_simple"): id_first = int(m.group("id_simple")) r = (id_first, id_first) elif m.group("id_following"): id_first = int(m.group("id_following")) r = (id_first, id_first + 1) elif m.group("id_following_twice"): id_first = int(m.group("id_following_twice")) r = (id_first, id_first + 3) elif m.group("id_range_begin"): begin_str, end_str = m.group("id_range_begin"), m.group("id_range_end") common_prefix_len = len(begin_str) - len(end_str) different_suffix_len = len(begin_str) - common_prefix_len if common_prefix_len > 0: end_str = begin_str[0:common_prefix_len] + end_str begin_int, end_int = int(begin_str), int(end_str) if common_prefix_len > 0 and begin_int > end_int: end_int += 10 ** different_suffix_len r = (begin_int, end_int) else: return None if r[1] < r[0]: raise Exception(f"IdRange invalid, last < first, {r[1]} < {r[0]}, range: {r}") if m.group("around"): r = (r[0] - ID_AROUND_RANGE, r[1] + ID_AROUND_RANGE) return cls(*r) @classmethod def from_str(cls, s: str): return cls.from_match(ID_REGEX.match(s)) @classmethod def from_scans(cls, scans): return cls(scans[0].id_range.first, scans[-1].id_range.last) @property def is_digital(self): return self.last < 0 @property def fancy(self): return self.to_fancy() def to_fancy(self, width: int = 0): if self.first == self.last: return f"{self.first:0{width}}" if self.first == self.last - 1: return f"{self.first:0{width}}+" return f"{self.first:0{width}}-{self.last:0{width}}" def align(self): first = self.first if first % 2 == 0: first -= 1 last = self.last if last % 2 == 1: last += 1 return IdRange(first, last) def __format__(self, format_spec): return self.fancy.__format__(format_spec) def __iter__(self): return iter(range(self.first, self.last + 1)) def __len__(self): return self.last - self.first + 1 def __str__(self): return self.fancy def interpret_date(text: str) -> datetime: for date_format in DATE_FORMATS: try: return datetime.strptime(text, date_format) except ValueError: continue return None def format_date(date: datetime) -> str: return date.strftime(DATE_FORMATS[0]) def avg(dates: list[datetime]) -> datetime: m = min(dates) s = sum((date - m for date in dates), start=timedelta()) return m + (s / len(dates)) @dataclass class ScanFile: path: Path date: str id_range: IdRange description: str @classmethod def from_path(cls, path: Path): m = SCAN_REGEX.match(path.name) if not m: if SCAN_WARN_REGEX.search(path.name): warn(f"{path}: Seems like a scanned document, but name is invalid") return None date = m.group("date") id_range = IdRange.from_match(m) if not id_range: raise Exception(f"IdRange could not be found while SCAN_REGEX matched, SCAN_REGEX must be invalid!") desc = m.group("description") return ScanFile(path, date, id_range, desc) @property def first_id(self): return self.id_range.first @property def last_id(self): return self.id_range.last @property def is_digital(self): return self.id_range.is_digital @property def title(self): if self.description: return self.description return self.path.with_suffix("").name @property def title_or_content(self): if self.description: return self.description return ",".join(self.most_common_words[:6]) @property def has_already_ocr(self) -> bool: return self.path.suffix == ".pdf" @cached_property def text_content(self) -> str: if self.has_already_ocr: cmd = [ "pdftotext", ] else: cmd = [ "tesseract", "-l", "+".join(OCR_LANGS), ] cmd += [ str(self.path.resolve()), "-", ] proc = subprocess.run(cmd, shell=False, check=True, capture_output=True, text=True) return proc.stdout @property def autocomplete_content(self) -> list[str]: return [e for e in CONTENT_SPLIT_REGEX.split(self.text_content) if len(e) >= 3] @cached_property def most_common_words(self) -> list[str]: word_counter = dict() for word in self.autocomplete_content: if word in word_counter: word_counter[word] += 1 else: word_counter[word] = 1 return [item[0] for item in sorted(word_counter.items(), key=lambda item: item[1])] @cached_property def all_dates_from_content(self) -> list[datetime]: # TODO date https://stackoverflow.com/questions/7821661/how-to-code-autocompletion-in-python dates = set() for probable_date in DATE_REGEX.finditer(self.text_content): probable_date_filtered = re.sub(r"\s", " ", probable_date.group(0)) date = interpret_date(probable_date_filtered) if date and date not in dates: dates.add(date) if len(dates) <= 1: return list(dates) older_dates = set() min_date = min(dates) max_date = max(dates) date_subset = dates - {min_date,} while len(date_subset) > 0 and max_date - min_date >= 2 * (max_date - min(date_subset)): dates = date_subset date_subset = dates - {min_date,} older_dates.add(min_date) min_date = min(dates) avg_date = avg(dates) + (max_date - min_date) * .2 return sorted(dates, key=lambda date: abs(avg_date - date)) + sorted(older_dates, reverse=True) @property def date_from_content(self) -> str: dates = self.all_dates_from_content if dates: return format_date(dates[0]) return None def gen_small_summary_entry(self): return [ str(self.id_range), self.description or "", ] def gen_small_summary(self): return " ".join(self.gen_small_summary_entry()) def __hash__(self): return hash(self.path) def __eq__(self, other): return self.path == other.path SCAN_FORMATS: dict[Callable[[ScanFile], str]] = { "content": lambda scan: scan.text_content, "date": lambda scan: scan.date_from_content, "id": lambda scan: str(scan.id_range), "id-date-title": lambda scan: f"{scan.id_range:>12} {str(scan.date):<10} {scan.title_or_content}", "id-path": lambda scan: f"{scan.id_range:>12} {scan.path}", "id-title": lambda scan: f"{scan.id_range:>12} {scan.title_or_content}", "path": lambda scan: scan.path, "title": lambda scan: scan.title_or_content, } def iter_files(path) -> Iterable[Path]: for child in Path(path).iterdir(): if child.is_dir() and not child.name.startswith("."): for child_child in iter_files(child): yield child_child elif child.is_file(): yield child def iter_scans(path) -> Iterable[Path]: for scan_path in iter_files(path): scan_file = ScanFile.from_path(scan_path) if scan_file: yield scan_file def iter_categories(path) -> Iterable[str]: for child in Path(path).iterdir(): if child.is_dir() and not child.name.startswith(".") and not child.name.startswith("_"): yield child.name for child_child_name in iter_categories(child): yield f"{child.name}/{child_child_name}" def sorted_by_id(scans) -> Iterable[ScanFile]: return sorted(scans, key=lambda scan: scan.first_id) def highest_id(scans) -> int: return max(scans, key=lambda scan: scan.last_id).last_id def resolve_per_id(scans): scans = list(scans) ids = [set() for i in range(highest_id(scans) + 1)] for scan in scans: if scan.is_digital: ids[0].add(scan) else: for i in scan.id_range: ids[i].add(scan) return ids def next_id(scans) -> int: next_one = highest_id(scans) next_one += 1 if next_one % 2 == 0: next_one += 1 # next id should be odd return next_one def lookup_scans(scans, *id_ranges): scan_ids = resolve_per_id(scans) return {scan for id_r in id_ranges for i in id_r if i < len(scan_ids) for scan in scan_ids[i]} def extract_dates(scans: List[ScanFile]) -> List[str]: # used dict instead of set to gurantee input order dates: Mapping[str, None] = dict() for scan in scans: if scan.date: dates[scan.date] = None for scan in scans: for date in scan.all_dates_from_content: dates[format_date(date)] = None return list(dates) # args dependent def read_single_id(args): if not args.id: warn("--id missing") sys.exit(2) id_r = IdRange.from_str(args.id) if id_r is None: warn(f'id "{args.id}" is invalid') sys.exit(2) return id_r def read_ids(args): if not args.id: warn("--id missing") sys.exit(2) ids_str = args.id.split(",") for id_str in ids_str: id_r = IdRange.from_str(id_str) if id_r is None: warn(f'id "{id_str}" is invalid') sys.exit(2) yield id_r def print_scans(args, scans, do_view=True): f = SCAN_FORMATS[args.format] for scan in scans: print(f(scan)) if do_view and args.view: proc = subprocess.Popen(["imv-x11", *(str(scan.path) for scan in scans)], stdin=None, stdout=None, stderr=None, close_fds=True) return lambda: proc.terminate() return lambda: None def cmd_by_id(args, scans): id_ranges = read_ids(args) print_scans(args, sorted_by_id(lookup_scans(scans, *id_ranges))) def cmd_check_duplicates(args, scans): print_anything = False ids = resolve_per_id(scans) ids.pop(0) # remove digital only for id_scans in ids: if 1 < len(id_scans): if print_anything: print("---") print_anything = True print_scans(args, id_scans) if print_anything: sys.exit(1) def cmd_convert(args, scans: Iterable[ScanFile]): cmd_list = list[str]() for scan in scans: if not scan.has_already_ocr: cmd_list.append("&&".join([ build_ocr_args(scan.path, out_file=scan.path.with_suffix(".pdf"), additional_args=["--jobs", "1"]), build_args([ "rm", scan.path, ]), ])) if args.output_commands: for cmd in cmd_list: print(cmd) else: for cmd in cmd_list: subprocess.run(cmd, check=True, shell=True) def cmd_list(args, scans): print_scans(args, sorted_by_id(scans)) def cmd_list_categories(args, scans): for category in iter_categories("."): print(category) def cmd_merge(args, scans): # search for scans id_r = read_single_id(args).align() found = sorted_by_id(lookup_scans(scans, id_r)) if len(found) <= 0: warn(f"No scan with id {id_r} found") sys.exit(3) elif len(found) == 1 and found[0].path.suffix == ".pdf" and found[0].date is not None: warn(f"Only one scan with {id_r} found which is already a PDF and has a date, so no merge required") sys.exit(4) id_r = IdRange.from_scans(found) if len(id_r) > 2: id_r = id_r.align() print("will merge following scans:") print_scans(args, found, do_view=False) print("") # combine before for better displayment def build_cmd(output_file: Path): combine_args = build_args([ "pdfunite", *(scan.path for scan in found), "/dev/stdout", ]) ocr_args = build_ocr_args("-", output_file) return f"{combine_args} | {ocr_args}" with tempfile.NamedTemporaryFile() as fp: subprocess.run(build_cmd(fp.name), check=True, shell=True) if args.view: pdf_viewer = subprocess.Popen(PDF_VIEWER_ARGS + [fp.name], stdin=subprocess.PIPE, stdout=subprocess.PIPE) # get existing parameters doc_dates = [format_date(datetime.now())] + extract_dates(found) doc_dates = [] + doc_dates doc_title = "" for scan in found: if scan.description: doc_title = scan.description break doc_categories = sorted(iter_categories("."), key=locale.strxfrm) # ask for doc date, description, category for output_file doc_date = rlinput("Document date: ", suggestions=doc_dates) doc_title = rlinput("Document title: ", doc_title) doc_category = rlinput("Document category: ", prefill="", suggestions=doc_categories) or DEFAULT_CATEGORY output_file = str(id_r) if doc_date: output_file = f"{doc_date}_{output_file}" if doc_title: output_file += f"_{doc_title}" output_file = f"{doc_category}/{output_file}.pdf" if args.view: pdf_viewer.terminate() # execute command if args.dry_run: print(build_cmd(output_file)) return cat_dir = Path(doc_category) if not cat_dir.is_dir(): cat_dir.mkdir(parents=True) Path(fp.name).rename(output_file) Path(fp.name).touch() if not args.keep: for scan in found: scan.path.unlink() def cmd_missing_ids(args, scans): ids = resolve_per_id(scans) ids.pop(0) # remove digital only for i, id_scans in enumerate(ids): if (i % 2) == 0: # odd ids when i is even due to .pop(0) if len(id_scans) <= 0: print(f"{i + 1}+") # due to .pop(0) def cmd_next_id(args, scans): print(args.force_next_id or next_id(scans)) def cmd_rebuild_index(args, scans: Iterable[ScanFile]): index_dir = Path(INDEX_DIR) if index_dir.exists(): if not index_dir.is_dir(): raise Exception(f"Expected '{index_dir}' to be a directory or to not exist") shutil.rmtree(index_dir) index_dir.mkdir() scans = list(scans) num_width = max(len(str(highest_id(scans))), MIN_NUM_WIDTH) for scan in scans: if not scan.is_digital: (index_dir / f"{scan.id_range.to_fancy(width=num_width)}_{scan.title}{scan.path.suffix}").symlink_to(".." / scan.path.relative_to(index_dir.parent)) def cmd_scan(args, scans): scans = list(scans) scan_source = FLATBED_SCAN_SOURCE if args.flatbed \ else ADF_SCAN_SOURCE if args.adf or USE_ADF_BY_DEFAULT \ else FLATBED_SCAN_SOURCE cmd = [ "scanimage", "--source", scan_source, "--batch", "--batch-start", str(args.force_next_id or next_id(scans)), "--batch-print", "--format", "png", "--resolution", "600", ] if args.flatbed: cmd.append("--batch-prompt") subprocess.run(cmd, check=True, cwd=Path(DEFAULT_CATEGORY).resolve()) if not args.skip_convert: cmd_convert(args, scans) def cmd_test_id_align(args, scans): tests = [ (IdRange(1, 2), IdRange(1, 2)), (IdRange(1, 3), IdRange(1, 4)), (IdRange(4, 4), IdRange(3, 4)), (IdRange(4, 8), IdRange(3, 8)), (IdRange(4, 7), IdRange(3, 8)), ] for test in tests: aligned = test[0].align() if aligned != test[1]: warn(f"{test[0]} aligned to {aligned} does not equal to {test[1]}") sys.exit(1) COMMANDS = { "by-id": cmd_by_id, "check-duplicates": cmd_check_duplicates, "convert": cmd_convert, "list-categories": cmd_list_categories, "merge": cmd_merge, "missing-ids": cmd_missing_ids, "next-id": cmd_next_id, "rebuild-index": cmd_rebuild_index, "scan": cmd_scan, "test-id-align": cmd_test_id_align, } def main(): parser = argparse.ArgumentParser() parser.add_argument("-s", "--dry-run", "--simulate", action="store_true") parser.add_argument("--force-next-id", required=False) parser.add_argument("-k", "--keep", action="store_true") parser.add_argument("-a", "--adf", action="store_true") parser.add_argument("-F", "--flatbed", action="store_true") parser.add_argument("-f", "--format", choices=list(SCAN_FORMATS), default="id-date-title") parser.add_argument("--id", "--ids", required=False) parser.add_argument("--view", action="store_true") parser.add_argument("--output-commands", action="store_true") parser.add_argument("--skip-convert", action="store_true") parser.add_argument("action", choices=list(COMMANDS)) args = parser.parse_args() scans = iter_scans(".") try: COMMANDS[args.action](args, scans) except subprocess.CalledProcessError as e: warn(f"Failed to run command, exited with exit code {e.returncode}: " + " ".join(e.cmd) if type(e.cmd) == list else e.cmd) sys.exit(2) except KeyboardInterrupt: print("Aborted by user") sys.exit(1) if __name__ == "__main__": main()