Source code for torrent_models.hashing.hybrid

"""
Hybrid v1/v2 torrent creation

This is not a straightforward combination of v1 and v2 hashing
since each version of torrent has different optimization requirements.

Since v1 is just a linear set of hashes, and the pieces are much larger units,
we can read a larger buffer and feed the whole thing into a hashing process at once.
v2 works on 16KiB chunks always, so the tradeoff of reading and processing time is a bit different.

Hybrid torrents require us to do both, as well as generate padfiles,
so we use routines from the v1 and v2 but build on top of them.
"""

from functools import cached_property
from itertools import count
from multiprocessing.pool import AsyncResult
from multiprocessing.pool import Pool as PoolType
from pathlib import Path
from typing import cast, overload

from pydantic import PrivateAttr, field_validator

from torrent_models.const import BLOCK_SIZE
from torrent_models.hashing.base import Chunk, Hash
from torrent_models.hashing.v1 import V1Hasher
from torrent_models.hashing.v2 import V2Hasher, sort_v2
from torrent_models.types.v1 import FileItem
from torrent_models.types.v2 import PieceLayers, V2PieceLength


[docs] def add_padfiles(files: list[FileItem], piece_length: int) -> list[FileItem]: """ Modify a v1 file list to intersperse .pad files """ padded = [] for f in files: padded.append(f) if f.attr in (b"p", "p"): continue if (remainder := f.length % piece_length) != 0: pad_length = piece_length - remainder pad = FileItem(length=pad_length, path=[".pad", str(pad_length)], attr=b"p") padded.append(pad) return padded
[docs] class HybridHasher(V1Hasher, V2Hasher): piece_length: V2PieceLength read_size: V2PieceLength | None = None _v1_chunks: list[Chunk] = PrivateAttr(default_factory=list) _last_path: Path | None = None _v1_counter: count = PrivateAttr(default_factory=count)
[docs] @field_validator("paths", mode="after") def sort_paths(cls, value: list[Path]) -> list[Path]: """ v1 torrents have arbitrary file sorting, but we mimick libtorrent/qbittorrent's sort order for consistency's sake """ value = sort_v2(value) return value
[docs] @cached_property def blocks_per_piece(self) -> int: return int(self.piece_length / BLOCK_SIZE)
[docs] @cached_property def total_hashes(self) -> int: return self._v2_total_hashes() + self._v1_total_hashes_hybrid()
@overload def update(self, chunk: Chunk, pool: PoolType) -> list[AsyncResult]: ... @overload def update(self, chunk: Chunk, pool: None) -> list[Hash]: ...
[docs] def update(self, chunk: Chunk, pool: PoolType | None = None) -> list[AsyncResult] | list[Hash]: res = self._update_v2(chunk, pool) res.extend(self._update_v1(chunk, pool)) # type: ignore return res
@overload def _on_file_end(self, pool: PoolType) -> list[AsyncResult]: ... @overload def _on_file_end(self, pool: None) -> list[Hash]: ... def _on_file_end(self, pool: PoolType | None) -> list[AsyncResult] | list[Hash]: """Pad and submit buffer""" if len(self._buffer) == 0: return [] self._buffer.extend(bytes(self.piece_length - len(self._buffer))) self._last_path = cast(Path, self._last_path) chunk = Chunk.model_construct( idx=next(self._v1_counter), path=self._last_path, chunk=bytes(self._buffer) ) self._buffer = bytearray() if pool: return [pool.apply_async(self._hash_v1, args=(chunk, self.path_root))] else: return [self._hash_v1(chunk, self.path_root)] @overload def _after_read(self, pool: PoolType) -> list[AsyncResult]: ... @overload def _after_read(self, pool: None) -> list[Hash]: ... def _after_read(self, pool: PoolType | None) -> list[AsyncResult] | list[Hash]: """Submit any remaining v1 pieces from the last file""" res = self._on_file_end(pool) return res
[docs] def split_v1_v2( self, hashes: list[Hash], ) -> tuple[PieceLayers, list[bytes]]: """Split v1 and v2 hashes, returning sorted v1 pieces and v2 piece layers""" v1_pieces = [h for h in hashes if h.type == "v1_piece"] v1_pieces = sorted(v1_pieces, key=lambda h: h.idx) v1_pieces = [h.hash for h in v1_pieces] v2_leaf_hashes = [h for h in hashes if h.type == "block"] trees = self.finish_trees(v2_leaf_hashes) layers = PieceLayers.from_trees(trees, self.path_root) return layers, v1_pieces