#!/usr/bin/env python """ Caching replacement for find util I use it to search a network filesystem for files to pipe to rofi, so I don't care about duplicate output. """ import argparse import glob import hashlib import logging import os import sys import argcomplete from xdg import xdg_cache_home from xdg import xdg_data_home logger = logging.getLogger() logger.setLevel(logging.DEBUG) filehandler = logging.FileHandler( os.path.join(xdg_data_home(), "nkfind.log"), mode="a", ) filehandler.setLevel(logging.INFO) formatter = logging.Formatter( fmt="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) filehandler.setFormatter(formatter) logger.addHandler(filehandler) MINSIZE = 10000 # 10M class Finder: def __init__(self, quiet=False, do_sort=False, sort_old=False): self.quiet = quiet self.do_sort = do_sort self.sort_old = sort_old def get_cachebase(self): return os.path.join(xdg_cache_home(), "nkfind") def get_cachefile(self, path, size): pathhash = hashlib.md5(f"{path}{size}".encode()).hexdigest() base = self.get_cachebase() if not os.path.isdir(base): os.mkdir(base) return os.path.join(base, pathhash) def stdout(self, value): if self.quiet is True: return try: sys.stdout.write(value) except BrokenPipeError: sys.stderr.write("Aboring\n") sys.exit(0) except UnicodeEncodeError: logger.error("Can't output {repr(value)}\n") def dump_cache(self, cachefile): r = [] with open(cachefile, "r") as fh: for line in fh.readlines(): self.stdout(line) r.append(line.strip()) return r def rfind(self, path, size): found = 0 paths = [] for directory in os.walk(path): for filename in directory[2]: fullpath = os.path.join(path, directory[0], filename) try: fullpath_size = os.path.getsize(fullpath) except FileNotFoundError: logger.error(f"{fullpath} does not exist") continue if fullpath_size >= size: found += 1 if self.do_sort is False: yield fullpath else: paths.append(fullpath) if self.do_sort is True: if self.sort_old is True: r = sorted(paths, key=lambda t: os.stat(t).st_mtime) else: r = sorted(paths, key=lambda t: -os.stat(t).st_mtime) for x in r: yield x def dump_disk(self, path, size, silent=False): """ We return the files as we may want to update/create the cache" """ logger.info(f"Dumping files from disk {path} {size}") files = [] for filename in self.rfind(path, size): files.append(filename) if silent is False: self.stdout(f"{filename}\n") return files def write_cache(self, path, size, files=None): """ :param cached: bool, if the files were retrieved from a cache :files: list of str """ cachefile = self.get_cachefile(path, size) logger.info(f"Writing cache to disk for {path} {size}, {len(files)} files") with open(cachefile, "w") as fh: for filename in files: try: fh.write(f"{filename}\n") except UnicodeEncodeError: pass def dump_from_cache_or_disk(self, path, size, flush=False, flushafter=False): """ :returns: tuple (bool, list), if the cache was hit, and files found on disk """ cachefile = self.get_cachefile(path, size) if flush is True: if os.path.isfile(cachefile): os.remove(cachefile) if os.path.isfile(cachefile): logger.info(f"Cache hit for {path} {size}:{cachefile}") self.dump_cache(cachefile) return True, None logger.info(f"Cache miss for {path} {size}:{cachefile}") return False, self.dump_disk(path, size) def nkfind(self, paths, size=4000, flush=False, flushafter=False): rmap = {} for path in paths: # path = os.path.realpath(path) if not os.path.isdir(path): logger.error(f"Ignore path {path}") continue (cached, files) = self.dump_from_cache_or_disk( path, size, flush=flush, flushafter=flushafter ) if cached is False: rmap[path] = files if cached is True and flushafter is True: rmap[path] = None for path, files in rmap.items(): if files is None: # This means we dumped the cache, we should output new files files = self.dump_disk(path, size, silent=True) self.write_cache(path, size, files=files) if __name__ == "__main__": if not os.path.isdir(xdg_data_home()): os.mkdir(xdg_data_home(), parents=True) parser = argparse.ArgumentParser(description="Cached find") default_path = os.getcwd() parser.add_argument("paths", type=str, nargs="*", default=[default_path]) parser.add_argument( "--size", "-s", type=int, default=MINSIZE * 1000, help=f"Minimum size in kB (default {MINSIZE}", ) parser.add_argument("--quiet", action="store_true") parser.add_argument( "--sort-new", "--sort", action="store_true", help="Newest first, you probably want --flush too", ) parser.add_argument( "--sort-old", "--so", action="store_true", help="Oldest first, you probably want --flush too", ) parser.add_argument( "--flush", action="store_true", help="Flush caches, force rebuild", ) parser.add_argument( "--flushafter", action="store_true", help="Flush caches after dumping", ) parser.add_argument( "--flushall", action="store_true", help="Flush all caches", ) argcomplete.autocomplete(parser) args = parser.parse_args() f = Finder(quiet=args.quiet, do_sort=args.sort_new, sort_old=args.sort_old) if args.flushall is True: for cachefile in glob.glob(f"{f.get_cachebase()}*"): os.remove(cachefile) if args.paths: f.nkfind( args.paths, size=args.size, flush=args.flush, flushafter=args.flushafter, )