"""
Convenience class used when creating a new torrent.
The :class:`.TorrentCreate` class provides some convenience fields
that allows some common fields to be declared in a version-agnostic way
at the top level, rather than nested within the infodict.
"""
import multiprocessing as mp
from pathlib import Path
from typing import Any, Self, cast
from pydantic import Field, model_validator
from torrent_models import Torrent, TorrentVersion
from torrent_models.compat import get_size
from torrent_models.const import DEFAULT_TORRENT_CREATOR, EXCLUDE_FILES
from torrent_models.hashing import HybridHasher, V1Hasher, add_padfiles
from torrent_models.hashing.v1 import sort_v1
from torrent_models.info import InfoDictHybrid, InfoDictHybridCreate, InfoDictV1, InfoDictV2
from torrent_models.torrent import TorrentBase
from torrent_models.types import (
AbsPath,
ByteStr,
FileItem,
TrackerFields,
V1PieceLength,
V2PieceLength,
)
from torrent_models.types.v2 import FileTree, PieceLayers
[docs]
class TorrentCreate(TorrentBase):
"""
A programmatically created torrent that may not have its hashes computed yet.
Torrents may be created *either* by passing an info dict with all details,
(with or without piece hashes), *or* by using a handful of convenience fields.
E.g. rather than needing to pass a fully instantiated file tree,
one can just pass a list of files to ``files``
"""
_EXCLUDE = {
"paths": True,
"path_root": True,
"trackers": True,
"piece_length": True,
"info": {"meta_version", "files", "file_tree", "piece_length"},
"piece_layers": True,
}
"""
Exclude from model dumps when creating internal model dumps when generating.
ie. because they are transformed by creation
"""
# make parent types optional
announce: ByteStr | None = None
created_by: ByteStr | None = Field(DEFAULT_TORRENT_CREATOR, alias="created by")
# convenience fields
info: InfoDictHybridCreate = Field(default_factory=InfoDictHybridCreate) # type: ignore
paths: list[Path] | None = Field(
None,
description="""
Convenience field for creating torrents from lists of files.
Can be either relative or absolute.
Paths must be located beneath the path root, passed either explicitly or using
cwd (default).
If absolute, paths are made relative to the path root.
""",
)
path_root: AbsPath = Field(
default_factory=Path, description="Path to interpret paths relative to"
)
trackers: list[ByteStr] | list[list[ByteStr]] | None = Field(
None,
description="Convenience method for declaring tracker lists."
"If a flat list, put each tracker in a separate tier."
"Otherwise, sublists indicate tiers.",
)
piece_length: V1PieceLength | V2PieceLength | None = Field(
None, description="Convenience method for passing piece length"
)
similar: list[bytes] | None = Field(
None, description="Infohashes of other torrents that might contain overlapping files"
)
[docs]
@model_validator(mode="after")
def no_duplicated_params(self) -> Self:
"""
Ensure that values that can be set from the top level convenience fields aren't doubly set,
We don't set the accompanying values in the infodict on instantiation because
this object is intended to be a programmatic constructor object,
so we expect these values to change and don't want to have to worry about
state consistency in it -
all values are gathered and validated when the torrent is generated.
"""
if self.paths:
assert not self.info.files, "Can't pass both paths and info.files"
assert not self.info.file_tree, "Can't pass both paths and info.file_tree"
if self.trackers:
assert not self.announce, "Can't pass both trackers and announce"
assert not self.announce_list, "Can't pass both trackers and announce_list"
if self.piece_length:
assert not self.info.piece_length, "Can't pass both piece_length and info.piece_length"
return self
[docs]
@model_validator(mode="after")
def name_from_path_root(self) -> Self:
"""If `name` is not provided, infer it from the path root"""
if not self.info.name:
self.info.name = self.path_root.name
return self
[docs]
def generate(
self, version: TorrentVersion | str, n_processes: int | None = 1, progress: bool = False
) -> Torrent:
"""
Generate a torrent file, hashing its pieces and transforming convenience values
to valid torrent values.
"""
if isinstance(version, str):
version = TorrentVersion.__members__[version]
if n_processes is None:
n_processes = mp.cpu_count()
if version == TorrentVersion.v1:
return self._generate_v1(n_processes, progress)
elif version == TorrentVersion.v2:
return self._generate_v2(n_processes, progress)
elif version == TorrentVersion.hybrid:
return self._generate_hybrid(n_processes, progress)
else:
raise ValueError(f"Unknown torrent version: {version}")
[docs]
def generate_libtorrent(
self,
version: TorrentVersion | str,
output: Path | None = None,
bencode: bool = False,
progress: bool = False,
) -> dict | bytes:
from torrent_models.libtorrent import create_from_model
return create_from_model(
self, version=version, progress=progress, output=output, bencode=bencode
)
def _generate_common(self) -> dict:
# dump just the fields we want to have in the final torrent,
# excluding top-level convenience fields (set in the generate methods),
# and hash values which are created during generation
dumped = self.model_dump(
exclude_none=True,
exclude=self._EXCLUDE, # type: ignore
by_alias=False,
)
dumped["info"]["piece_length"] = self._get_piece_length()
if "similar" in dumped:
dumped["info"]["similar"] = dumped["similar"]
del dumped["similar"]
dumped.update(self.get_trackers())
return dumped
def _generate_v1(self, n_processes: int, progress: bool = False, **kwargs: Any) -> Torrent:
dumped = self._generate_common()
paths = self.get_paths(clean=True, v1_order=True)
file_items = self._get_v1_file_items(paths)
if not self.info.files:
if len(file_items) == 1:
dumped["info"]["name"] = file_items[0].path[-1]
dumped["info"]["length"] = file_items[0].length
else:
dumped["info"]["files"] = file_items
if "pieces" not in dumped["info"]:
hasher = V1Hasher(
paths=paths,
piece_length=self._get_piece_length(),
read_size=self._get_piece_length(),
path_root=self.path_root,
n_processes=n_processes,
progress=progress,
**kwargs,
)
hashes = hasher.process()
hashes = [hash.hash for hash in sorted(hashes, key=lambda x: x.idx)]
dumped["info"]["pieces"] = hashes
info = InfoDictV1(**dumped["info"])
del dumped["info"]
return Torrent(info=info, **dumped)
def _generate_v2(self, n_processes: int, progress: bool = False) -> Torrent:
dumped = self._generate_common()
paths = self.get_paths(clean=True, v1_order=False)
if "piece_layers" not in dumped or "file_tree" not in dumped["info"]:
piece_layers = PieceLayers.from_paths(
paths=paths,
piece_length=dumped["info"]["piece_length"],
path_root=self.path_root,
n_processes=n_processes,
progress=progress,
)
dumped["piece_layers"] = piece_layers.piece_layers
dumped["info"]["file_tree"] = piece_layers.file_tree.tree
info = InfoDictV2(**dumped["info"])
del dumped["info"]
return Torrent(info=info, **dumped)
def _generate_hybrid(self, n_processes: int, progress: bool = False) -> Torrent:
dumped = self._generate_common()
# Gather paths
if (self.info.files or self.info.length) and self.info.file_tree:
# check for inconsistent paths in v1 and v2 if both are present
v1_paths = self._get_v1_paths()
v1_items = self._get_v1_file_items(v1_paths)
v2_paths = [Path(path) for path in FileTree.flatten_tree(self.info.file_tree)]
if not len(v1_paths) == len(v2_paths) and not all(
[v1p == v2p for v1p, v2p in zip(v1_paths, v2_paths)]
):
raise ValueError(
"Both v1 files and v2 file tree present, but have inconsistent paths!"
)
paths = v2_paths
else:
paths = self.get_paths(clean=True, v1_order=False)
# v1 files
v1_items = self._get_v1_file_items(paths)
# add padding to the v1 files
v1_items = add_padfiles(v1_items, dumped["info"]["piece_length"])
hasher = HybridHasher(
paths=paths,
path_root=self.path_root,
piece_length=self.piece_length,
read_size=self.piece_length,
n_processes=n_processes,
progress=progress,
)
hashes = hasher.process()
piece_layers, v1_pieces = hasher.split_v1_v2(hashes)
dumped["piece layers"] = piece_layers.piece_layers
dumped["info"]["file tree"] = piece_layers.file_tree.tree
dumped["info"]["pieces"] = v1_pieces
if len(v1_items) == 1:
dumped["info"]["name"] = v1_items[0].path[-1]
dumped["info"]["length"] = v1_items[0].length
else:
dumped["info"]["files"] = v1_items
info = InfoDictHybrid(**dumped["info"])
del dumped["info"]
return Torrent(info=info, **dumped)
[docs]
def get_paths(self, clean: bool = True, v1_order: bool = False) -> list[Path]:
"""
Get paths specified in one of potentially several ways
In order (first match is returned):
- paths set in top level `paths` field
- v2 file tree, if present
- v1 `files`, if present
- v1 `name`, if present with `length` set
- iterate the files beneath the :attr:`.path_root`
Args:
clean (bool): clean and sort the files
v1_order (bool): sort files in v1 order -
first top-level files, then files in directories
in case-sensitive alphanumeric order within those categories.
"""
if self.paths:
paths = self.paths.copy()
elif self.info.file_tree is not None:
tree = self.flat_files
assert tree is not None
paths = [Path(t) for t in tree]
else:
try:
paths = self._get_v1_paths()
except ValueError:
# no V1 paths, get files beneath base-path
paths = list(self.path_root.rglob("*"))
if not paths:
raise ValueError("No paths provided, and nothing found within path root!")
if clean:
paths = clean_files(paths, relative_to=self.path_root, v1=v1_order)
return paths
def _get_v1_paths(self, paths: list[Path] | None = None, v1_only: bool = False) -> list[Path]:
if paths:
files = paths
elif self.paths:
files = self.paths
elif self.info.files:
files = [Path(*f.path) for f in self.info.files]
elif self.info.length and self.info.name is not None:
files = [Path(self.info.name)]
else:
raise ValueError("paths not provided, and info.files and info.length are unset!")
files = clean_files(files, relative_to=self.path_root, v1=v1_only)
return files
def _get_v1_file_items(self, paths: list[Path]) -> list[FileItem]:
items = [FileItem(path=list(f.parts), length=get_size(self.path_root / f)) for f in paths]
return items
[docs]
def get_trackers(
self,
) -> TrackerFields:
# FIXME: hideous
if self.trackers:
if isinstance(self.trackers[0], list):
self.trackers = cast(list[list[str]], self.trackers)
if len(self.trackers[0]) == 1 and len(self.trackers[0][0]) == 1:
return {"announce": self.trackers[0][0]}
else:
return {"announce": self.trackers[0][0], "announce-list": self.trackers}
else:
self.trackers = cast(list[str], self.trackers)
if len(self.trackers) == 1:
return {"announce": self.trackers[0]}
else:
return {
"announce": self.trackers[0],
"announce-list": [[t] for t in self.trackers],
}
else:
trackers_: TrackerFields = {}
if self.announce is not None:
trackers_["announce"] = self.announce
if self.announce_list is not None:
trackers_["announce-list"] = self.announce_list
return trackers_
def _get_piece_length(self) -> int:
piece_length = self.piece_length if self.piece_length else self.info.piece_length
if piece_length is None:
raise ValueError("No piece length provided!")
return piece_length
[docs]
def list_files(path: Path | str) -> list[Path]:
"""
Recursively list files relative to path, sorting, excluding known system files
"""
path = Path(path)
if path.is_file():
return [path]
paths = list(path.rglob("*"))
return clean_files(paths, path)
[docs]
def clean_files(paths: list[Path], relative_to: Path, v1: bool = False) -> list[Path]:
"""
Remove system files, and make paths relative to some directory root
"""
cleaned = []
for f in paths:
if f.is_absolute():
abs_f = f
# no absolute paths in the torrent plz
rel_f = f.relative_to(relative_to)
else:
abs_f = relative_to / f
rel_f = f
if not abs_f.exists():
raise FileNotFoundError(
f"File {abs_f} does not exist for path {f} relative to {relative_to}"
)
if abs_f.is_file() and f.name not in EXCLUDE_FILES:
cleaned.append(rel_f)
cleaned = sort_v1(cleaned) if v1 else sorted(cleaned, key=lambda f: f.as_posix())
return cleaned