9
\$\begingroup\$

This repository class is supposed to be used for saving information extracted from RAR, SFV archives or files that contain the hash sum in their filenames.

The abstract parent class can be found here). For now, only save/update and load are implemented.

As strategy, I took the union of all fields of those classes to be used as columns of one single table. This is simple, but obviously introduces the danger that the database will easily reach an inconsistent state if the save code has logical errors.

Now this is a rather generic issue, but SQLite has some strange quirks if one comes from a not so narrow SQL implementation. The foreign key issue was certainly surprising.

import collections.abc
import datetime
import pathlib
import sqlite3
from types import TracebackType
from typing import cast

from hoarder.hash_archive import Algo, FileEntry, HashArchive
from hoarder.hash_name_archive import HashEnclosure, HashNameArchive
from hoarder.rar_archive import RarArchive
from hoarder.rar_path import RarScheme
from hoarder.sfv_archive import SfvArchive


class Sqlite3FK:
    """
    Context-manager that turns ON foreign-key enforcement,
    and actually closes the connection.
    Does not suppress any encountered exceptions.
    """

    _db_path: str | pathlib.Path
    _conn: sqlite3.Connection | None

    def __init__(self, _db_path: str | pathlib.Path):
        self._db_path = pathlib.Path(_db_path)
        self._conn = None

    def __enter__(self) -> sqlite3.Connection:
        self._conn = sqlite3.connect(self._db_path)
        _ = self._conn.execute("PRAGMA foreign_keys = ON;")
        return self._conn

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ):
        assert self._conn is not None
        if exc_value is None:
            self._conn.commit()
            self._conn.close()
        else:
            self._conn.rollback()
            self._conn.close()


class HashArchiveRepository:
    """Repository for any HashArchive subclass."""

    _db_path: str | pathlib.Path

    _CREATE_HASH_ARCHIVES: str = """
    CREATE TABLE IF NOT EXISTS hash_archives (
        id             INTEGER  PRIMARY KEY AUTOINCREMENT,
        type           TEXT     NOT NULL,
        path           TEXT     NOT NULL UNIQUE,
        deleted        INTEGER,
        timestamp      TEXT     DEFAULT CURRENT_TIMESTAMP,
        -- HashNameArchive
        hash_enclosure TEXT,
        -- RarArchive
        password       TEXT,
        rar_scheme     INTEGER,
        rar_version    TEXT,
        n_volumes      INTEGER
    );
    """

    _CREATE_FILE_ENTRIES: str = """
    CREATE TABLE IF NOT EXISTS file_entries (
        id          INTEGER  PRIMARY KEY AUTOINCREMENT,
        path        TEXT     NOT NULL,
        size        INTEGER,
        is_dir      INTEGER  NOT NULL,
        hash_value  BLOB,
        algo        INTEGER,
        archive_id  INTEGER  NOT NULL,
        FOREIGN KEY (archive_id)
          REFERENCES hash_archives(id)
          ON DELETE CASCADE
    );
    """

    def __init__(self, db_path: str | pathlib.Path) -> None:
        self._db_path = db_path
        self._create_tables()

    def save(self, archive: HashArchive) -> None:
        """Insert or replace one archive and all its FileEntry rows."""
        archive_row = self._build_archive_row(archive)

        with Sqlite3FK(self._db_path) as con:
            cur = con.cursor()
            _ = cur.execute("BEGIN;")
            _ = cur.execute(
                "DELETE FROM hash_archives WHERE path = ?;", (archive_row["path"],)
            )
            _ = cur.execute(
                f"""
                INSERT INTO hash_archives ({', '.join(archive_row)})
                VALUES ({', '.join(':' + k for k in archive_row)});
                """,
                archive_row,
            )

            if archive.files:
                fe_rows = list(
                    self._build_fileentry_rows(archive.files, archive_path=archive.path)
                )
                _ = cur.executemany(
                    """
                    INSERT INTO file_entries (path, size, is_dir, hash_value, algo, archive_id)
                    SELECT :path AS path, :size AS size, :is_dir AS is_dir, :hash_value AS hash_value,
                    :algo AS algo, id as archive_id FROM hash_archives WHERE path = :archive_path
                    """,
                    fe_rows,
                )
            con.commit()

    def load(self, path: pathlib.Path | str) -> HashArchive | None:
        """Return the archive (plus its FileEntry set) previously stored."""
        with Sqlite3FK(self._db_path) as con:
            con.row_factory = sqlite3.Row
            cur = con.cursor()

            arc_row = cast(
                None | sqlite3.Row,
                cur.execute(
                    "SELECT * FROM hash_archives WHERE path = :path;",
                    {"path": str(path)},
                ).fetchone(),
            )

            if arc_row is None:
                raise FileNotFoundError(str(path))

            archive = self._fill_archive(arc_row)

            fe_rows = cast(
                list[sqlite3.Row],
                cur.execute(
                    "SELECT * FROM file_entries WHERE archive_id = ?;", (arc_row["id"],)
                ).fetchall(),
            )

            archive.files = {
                FileEntry(
                    path=pathlib.PurePath(cast(str, r["path"])),
                    size=cast(int, r["size"]),
                    is_dir=bool(cast(int, r["is_dir"])),
                    hash_value=cast(bytes, r["hash_value"]),
                    algo=Algo(r["algo"]) if r["algo"] is not None else None,
                )
                for r in fe_rows
            }
            return archive

    def _create_tables(self) -> None:
        with Sqlite3FK(self._db_path) as con:
            cur = con.cursor()
            _ = cur.execute(HashArchiveRepository._CREATE_HASH_ARCHIVES)
            _ = cur.execute(HashArchiveRepository._CREATE_FILE_ENTRIES)

    @staticmethod
    def _now() -> str:
        return datetime.datetime.strftime(datetime.datetime.now(), "%Y-%m-%d %H:%M:%S")

    def _build_archive_row(self, arch: HashArchive) -> dict[str, str | int | None]:
        """Return a dict used directly with named-parameter SQL."""
        base: dict[str, str | int | None] = {
            "type": type(arch).__name__,
            "path": str(arch.path),
            "deleted": int(arch.deleted),
            "hash_enclosure": None,
            "password": None,
            "rar_scheme": None,
            "rar_version": None,
            "n_volumes": None,
        }
        if isinstance(arch, HashNameArchive):
            base["hash_enclosure"] = arch.enc.value
        elif isinstance(arch, RarArchive):
            base.update(
                password=arch.password,
                rar_scheme=arch.scheme.value if arch.scheme else None,
                rar_version=arch.version,
                n_volumes=arch.n_volumes,
            )
        elif isinstance(arch, SfvArchive):
            pass
        else:
            raise TypeError(f"Unsupported HashArchive subclass: {type(arch).__name__}")
        return base

    @staticmethod
    def _build_fileentry_rows(
        entries: collections.abc.Iterable[FileEntry],
        archive_path: str | pathlib.Path | None = None,
    ) -> collections.abc.Iterable[dict[str, str | int | None | bytes]]:
        for fe in entries:
            ret_dict: dict[str, str | int | None | bytes] = {
                "path": str(fe.path),
                "size": fe.size,
                "is_dir": int(fe.is_dir),
                "hash_value": fe.hash_value,
                "algo": fe.algo.value if fe.algo is not None else None,
            }
            if archive_path:
                ret_dict.update(archive_path=str(archive_path))
            yield (ret_dict)

    @staticmethod
    def _fill_archive(row: sqlite3.Row) -> HashArchive:
        archive_type = cast(str, row["type"])
        archive_path = cast(str, row["path"])
        arch: HashArchive | None = None
        if archive_type == "HashNameArchive":
            arch = HashNameArchive(
                pathlib.Path(archive_path),
                files=None,
                enc=HashEnclosure(row["hash_enclosure"]),
            )
        elif archive_type == "RarArchive":
            arch = RarArchive(
                pathlib.Path(archive_path),
                files=None,
                password=cast(str, row["password"]),
                version=cast(str, row["rar_version"]),
                scheme=(
                    RarScheme(cast(int, row["rar_scheme"]))
                    if row["rar_scheme"] is not None
                    else None
                ),
                n_volumes=cast(int, row["n_volumes"]),
            )
        elif archive_type == "SfvArchive":
            arch = SfvArchive(pathlib.Path(archive_path), files=set())
        else:
            raise ValueError(f"Unknown archive type in databaase: {archive_type}")

        arch.deleted = bool(cast(int, row["deleted"]))
        return arch

Some Pytest tests can be found here.

\$\endgroup\$

3 Answers 3

10
\$\begingroup\$

We're using black, isort, ruff, and type checkers -- terrific! Lots of best practices in there. It makes my life as maintainer / reviewer much easier.

middleware

We've gone pretty far down the "batteries included" import sqlite3 path already. But if you start a new project, consider accessing sqlite via SqlAlchemy. That's how I always do it, to preserve flexibility for swapping in a postgres backend as a project matures.

The sqlite quirk about not enforcing FK by default is something my code simply never has to worry about. It turns out that is due to SqlAlchemy issuing PRAGMA foreign_keys = ON; behind the scenes, so I can get on with standard SQL semantics in the usual way.

Additionally your IDE can offer better support for column renames and indenting of queries when we inherit from Base and use the ORM to create queries. And declaring / verifying types such as INTEGER or BOOL is much more pleasant in that setting.

Your Sqlite3FK context manager is nice enough. No need to implement that functionality if you have a SqlAlchemy session at your disposal, which already attends to those same tasks. No need to document or test a new class, when a well documented and tested library offering the same features is already available.

Also, prefer to shorten each call site by doing the usual from pathlib import Path. No need to see that verbose "pathlib" mentioned by each call. Similarly for strftime() and now().

Kudos for making the ctor set the connection to None, so we have an easily read "table of contents" of the valid attributes.

maintenance chores

I feel one of the more predictable maintenance tasks that will crop up over this project's lifetime is "add column".

There's a bunch of places in the OP codebase where column names and types are listed. They're not all consolidated in one place, within a screenful or two of code. It seems like, with the current structure, {add, rename, delete} column tasks will be less pleasant than they might have been, and the existing unit tests might not reliably expose remaining "to-do" methods that should be updated with a new column name.

constructor conveniences

This is very nice, as it supports concise calls from the app layer:

    def __init__(self, db_path: str | pathlib.Path) -> None:

However, I feel this is a regrettable type:

    _db_path: str | pathlib.Path

Better to assign self._db_path = Path(db_path). Note that if p is already a Path, then Path(p) simply returns p, with no harm done.

Also, having a _db_path attribute at the class level and the instance level seems bad. Prefer to keep track of it strictly at the instance level.

This is fine: _CREATE_HASH_ARCHIVES: str = """CREATE TABLE ...
But I can't imagine there's a need to annotate that constant. I mean, surely pyright and mypy infer str on their own, right, without needing a hint?

boolean column

Consider renaming to is_deleted, for consistency with is_dir. And consider requiring NOT NULL.

Also, if you ever need to pass a security audit I predict that password column will be trouble.

many COMMITs

.save() does a tiny amount of work, on one row, and immediately issues a COMMIT.

It appears to me that an app might plausibly loop over many entries, doing a .save() on each of them. In the interest of speed, consider making caller (or caller's context manager) responsible for issuing a single COMMIT. Consider offering a vector based Public API, where you accept a list of entries that are all committed together as one big transaction.

debugs

Now that the code is mature enough for review, consider deleting the various _ = ... assignments. They usually begin with
_ = cur.execute( ..., and seem to be there to support single-step inspection within a debugger.

timezone

The _now() helper returns a "%Y-%m-%d %H:%M:%S" string which omits timezone. Consider tacking on a 4-digit " +ZZZZ" suffix. Alternatively, document that it returns a UTC timestamp. The OP code uses the local timezone setting when producing a timestamp, but there's no docstring that warns about the result's ambiguity.

If we serialize such a timestamp, say to an RDBMS, we should also serialize the zone offset, or document that the offset is zero. That way some future reader can deserialize it and know which instant in time it corresponds to.

"optional" helper

Expressions like fe.algo.value if fe.algo is not None else None crop up in a few places. Consider introducing a trivial helper that allows you to simplify each call site, making it more concise.

possible None

_fill_archive() does this:

        arch.deleted = bool(cast(int, row["deleted"]))

Recommend you precede that with assert arch. The pyright type checker seems to believe the archive record can be None at that point.

automated tests

def test_repository() is very nice. But it's a little on the big side. Its three loops suggest that perhaps we'd like three smaller test functions which can individually signal Red or Green status.

If you inherit from unittest.TestCase you can take advantage of def setUp(self). And pytest offers its own approach to consolidating shared setup logic in a single place.

\$\endgroup\$
1
  • \$\begingroup\$ Two syntax peculiarities, annotating the obvious strings or _ = cur.execute were to please basedpyright (couldn't get pyright to work with neovim) - should've mentioned that. The save method also adds the file entry rows so it does a bit more. And while vectorizing this method is a good idea, in practice, since getting the CRC32 of entries in encrypted RAR v5 archives is a very slow operation, committing immediately is probably advisable. I agree with the rest of the suggestions. Thank you so much for this comprehensive review. \$\endgroup\$
    – viuser
    Commented Jul 22 at 20:29
6
\$\begingroup\$

Since I use SqlAlchemy with SQLite, here is my personal fix using SqlAlchemy events, basically these are hooks.

My models.py looks like this:

from datetime import UTC, date, datetime
from sqlite3 import Connection as SQLite3Connection

import argon2
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import Column, ForeignKey, Index, event, text
from sqlalchemy.engine import Engine
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship


db = SQLAlchemy()


class User(db.Model):
    __tablename__ = "user"
    __table_args__ = (Index("idx_user_name", text("LOWER(name)"), unique=True),)
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), nullable=False)
    ...


@event.listens_for(Engine, "connect")
def _set_sqlite_pragma(dbapi_connection, connection_record):
    """This is to enforce foreign keys on SQLite only"""
    if isinstance(dbapi_connection, SQLite3Connection):
        cursor = dbapi_connection.cursor()
        cursor.execute("PRAGMA foreign_keys=ON;")
        cursor.close()

The idea is to let the DB abstraction layer take care of these peculiarities transparently. I also use the same technique to hash passwords on the fly.

This keeps flexibility, you are always free to switch to another DBMS like Postgres for example.

\$\endgroup\$
6
\$\begingroup\$

Your code can benefit from a TypedDict or two.

Consider this assignment:

base: dict[str, str | int | None] = {
    "type": type(arch).__name__,
    "path": str(arch.path),
    "deleted": int(arch.deleted),
    "hash_enclosure": None,
    "password": None,
    "rar_scheme": None,
    "rar_version": None,
    "n_volumes": None,
}

For a dictionary with statically known keys, dict[str, str | int | None] is an extremely unprecise type. So unprecise, in fact, you had to cast() to get the right types:

archive_type = cast(str, row["type"])
archive_path = cast(str, row["path"])

Instead, prefer:

class _ArchiveRow(TypedDict):
    type: str
    path: str
    deleted: int
    hash_enclosure: str | None
    password: str | None
    rar_scheme: int | None
    rar_version: str | None,
    n_volumes: int | None

def _build_archive_row(self, arch: HashArchive) -> _ArchiveRow:
    base: _ArchiveRow = ...
    ...
    base["password"] = arch.password
    ...

Same for ret_dict:

class _FileentryRow(TypedDict):
    path: str
    size: int | None
    is_dir: int
    hash_value: bytes | None
    algo: int | None
    archive_path: NotRequired[str]

def _build_fileentry_rows(
    entries: collections.abc.Iterable[FileEntry],
    archive_path: str | pathlib.Path | None = None,
) -> collections.abc.Iterable[_FileentryRow]:
    for fe in entries:
        ret_dict: _FileentryRow = ...
        if archive_path:
            ret_dict["archive_path"] = str(archive_path)
        ...
\$\endgroup\$

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.