Caching find command

Magnifying glass

I use find on a network filesystem and pipe the output to rofi for a nice menu, but the list of files got longer and longer over time and getting the results took too long. I looked around and couldn't find a solution I liked a lot so I wrote a small Python script.

It finds all files above a certain size on one or many paths and caches the output. After outputting the cache it finds all new files, updates the cache, and outputs them.

Raw
#!/usr/bin/env python
"""
Caching replacement for find util

I use it to search a network filesystem for files to pipe to rofi, so I don't
care about duplicate output.
"""
import argparse
import glob
import hashlib
import logging
import os
import sys

import argcomplete
from xdg import xdg_cache_home
from xdg import xdg_data_home


logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
filehandler = logging.FileHandler(
    os.path.join(xdg_data_home(), "nkfind.log"),
    mode="a",
)
filehandler.setLevel(logging.INFO)
formatter = logging.Formatter(
    fmt="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
filehandler.setFormatter(formatter)
logger.addHandler(filehandler)

MINSIZE = 10000  # 10M


class Finder:
    def __init__(self, quiet=False, do_sort=False, sort_old=False):
        self.quiet = quiet
        self.do_sort = do_sort
        self.sort_old = sort_old

    def get_cachebase(self):
        return os.path.join(xdg_cache_home(), "nkfind")

    def get_cachefile(self, path, size):
        pathhash = hashlib.md5(f"{path}{size}".encode()).hexdigest()
        base = self.get_cachebase()
        if not os.path.isdir(base):
            os.mkdir(base)
        return os.path.join(base, pathhash)

    def stdout(self, value):
        if self.quiet is True:
            return
        try:
            sys.stdout.write(value)
        except BrokenPipeError:
            sys.stderr.write("Aboring\n")
            sys.exit(0)
        except UnicodeEncodeError:
            logger.error("Can't output {repr(value)}\n")

    def dump_cache(self, cachefile):
        r = []
        with open(cachefile, "r") as fh:
            for line in fh.readlines():
                self.stdout(line)
                r.append(line.strip())
        return r

    def rfind(self, path, size):
        found = 0
        paths = []
        for directory in os.walk(path):
            for filename in directory[2]:
                fullpath = os.path.join(path, directory[0], filename)
                try:
                    fullpath_size = os.path.getsize(fullpath)
                except FileNotFoundError:
                    logger.error(f"{fullpath} does not exist")
                    continue
                if fullpath_size >= size:
                    found += 1
                    if self.do_sort is False:
                        yield fullpath
                    else:
                        paths.append(fullpath)
        if self.do_sort is True:
            if self.sort_old is True:
                r = sorted(paths, key=lambda t: os.stat(t).st_mtime)
            else:
                r = sorted(paths, key=lambda t: -os.stat(t).st_mtime)
            for x in r:
                yield x

    def dump_disk(self, path, size, silent=False):
        """
        We return the files as we may want to update/create the cache"
        """
        logger.info(f"Dumping files from disk {path} {size}")
        files = []
        for filename in self.rfind(path, size):
            files.append(filename)
            if silent is False:
                self.stdout(f"{filename}\n")
        return files

    def write_cache(self, path, size, files=None):
        """
        :param cached: bool, if the files were retrieved from a cache
        :files: list of str
        """
        cachefile = self.get_cachefile(path, size)
        logger.info(f"Writing cache to disk for {path} {size}, {len(files)} files")
        with open(cachefile, "w") as fh:
            for filename in files:
                try:
                    fh.write(f"{filename}\n")
                except UnicodeEncodeError:
                    pass

    def dump_from_cache_or_disk(self, path, size, flush=False, flushafter=False):
        """
        :returns: tuple (bool, list), if the cache was hit, and files found on disk
        """
        cachefile = self.get_cachefile(path, size)
        if flush is True:
            if os.path.isfile(cachefile):
                os.remove(cachefile)
        if os.path.isfile(cachefile):
            logger.info(f"Cache hit for {path} {size}:{cachefile}")
            self.dump_cache(cachefile)
            return True, None
        logger.info(f"Cache miss for {path} {size}:{cachefile}")
        return False, self.dump_disk(path, size)

    def nkfind(self, paths, size=4000, flush=False, flushafter=False):
        rmap = {}
        for path in paths:
            # path = os.path.realpath(path)
            if not os.path.isdir(path):
                logger.error(f"Ignore path {path}")
                continue
            (cached, files) = self.dump_from_cache_or_disk(
                path, size, flush=flush, flushafter=flushafter
            )
            if cached is False:
                rmap[path] = files
            if cached is True and flushafter is True:
                rmap[path] = None
        for path, files in rmap.items():
            if files is None:
                # This means we dumped the cache, we should output new files
                files = self.dump_disk(path, size, silent=True)
            self.write_cache(path, size, files=files)


if __name__ == "__main__":
    if not os.path.isdir(xdg_data_home()):
        os.mkdir(xdg_data_home(), parents=True)
    parser = argparse.ArgumentParser(description="Cached find")
    default_path = os.getcwd()
    parser.add_argument("paths", type=str, nargs="*", default=[default_path])
    parser.add_argument(
        "--size",
        "-s",
        type=int,
        default=MINSIZE * 1000,
        help=f"Minimum size in kB (default {MINSIZE}",
    )
    parser.add_argument("--quiet", action="store_true")
    parser.add_argument(
        "--sort-new",
        "--sort",
        action="store_true",
        help="Newest first, you probably want --flush too",
    )
    parser.add_argument(
        "--sort-old",
        "--so",
        action="store_true",
        help="Oldest first, you probably want --flush too",
    )
    parser.add_argument(
        "--flush",
        action="store_true",
        help="Flush caches, force rebuild",
    )
    parser.add_argument(
        "--flushafter",
        action="store_true",
        help="Flush caches after dumping",
    )
    parser.add_argument(
        "--flushall",
        action="store_true",
        help="Flush all caches",
    )
    argcomplete.autocomplete(parser)
    args = parser.parse_args()
    f = Finder(quiet=args.quiet, do_sort=args.sort_new, sort_old=args.sort_old)
    if args.flushall is True:
        for cachefile in glob.glob(f"{f.get_cachebase()}*"):
            os.remove(cachefile)
    if args.paths:
        f.nkfind(
            args.paths,
            size=args.size,
            flush=args.flush,
            flushafter=args.flushafter,
        )

0 comments

Reply

Cancel reply
Markdown. Syntax highlighting with <code lang="php"><?php echo "Hello, world!"; ?></code> etc.
DjangoPythonBitcoinTuxDebianHTML5 badgeSaltStackUpset confused bugMoneyHackerUpset confused bugX.OrggitFirefoxWindowMakerBashIs it worth the time?i3 window managerWagtailContainerIrssiNginxSilenceUse a maskWorldInternet securityPianoFontGnuPGThunderbirdJenkinshome-assistant-logo