import posixpath
from math import ceil
from pathlib import Path
from typing import Any, BinaryIO, Self, cast
from typing import Literal as L
import bencode_rs
import humanize
from pydantic import Field, model_validator
from rich import print
from rich.console import Group
from rich.pretty import Pretty
from rich.table import Table
from torrent_models.base import ConfiguredBase
from torrent_models.info import (
InfoDictHybrid,
InfodictUnionType,
InfoDictV1,
InfoDictV1Base,
InfoDictV2,
InfoDictV2Base,
)
from torrent_models.types import (
ByteStr,
FileItem,
FileTreeItem,
GenericFileItem,
ListOrValue,
PieceLayersType,
TorrentVersion,
UnixDatetime,
str_keys,
)
from torrent_models.types.v1 import FileItemRange, V1PieceRange
from torrent_models.types.v2 import FileTree, V2PieceRange
[docs]
class TorrentBase(ConfiguredBase):
announce: ByteStr | None = None
announce_list: list[list[ByteStr]] | None = Field(default=None, alias="announce-list")
comment: ByteStr | None = None
created_by: ByteStr | None = Field(None, alias="created by")
creation_date: UnixDatetime | None = Field(default=None, alias="creation date")
info: InfodictUnionType
piece_layers: PieceLayersType | None = Field(None, alias="piece layers")
url_list: ListOrValue[ByteStr] | None = Field(
None, alias="url-list", description="List of webseeds"
)
_flat_files: dict[str, FileTreeItem] | None = None
_files: list[GenericFileItem] | None = None
@property
def webseeds(self) -> list[str] | None:
"""alias to url_list"""
return self.url_list
[docs]
@classmethod
def read_stream(cls, stream: BinaryIO, context: dict | None = None) -> Self:
tdata = stream.read()
tdict = bencode_rs.bdecode(tdata)
return cls.from_decoded(decoded=tdict, context=context)
[docs]
@classmethod
def read(cls, path: Path | str, context: dict | None = None) -> Self:
with open(path, "rb") as tfile:
torrent = cls.read_stream(tfile, context=context)
return torrent
[docs]
@classmethod
def from_decoded(
cls, decoded: dict[str | bytes, Any], context: dict | None = None, **data: Any
) -> Self:
"""Create from bdecoded dict"""
if decoded is not None:
# we fix these incompatible types in str_keys
decoded.update(data) # type: ignore
data = decoded # type: ignore
if any([isinstance(k, bytes) for k in data]):
data = str_keys(data) # type: ignore
if context is None:
context = {}
return cls.model_validate(data, context=context)
@property
def torrent_version(self) -> TorrentVersion:
if isinstance(self.info, InfoDictV1Base) and not isinstance(self.info, InfoDictV2Base):
return TorrentVersion.v1
elif isinstance(self.info, InfoDictV2Base) and not isinstance(self.info, InfoDictV1Base):
return TorrentVersion.v2
else:
return TorrentVersion.hybrid
@property
def v1_infohash(self) -> str | None:
"""hex-encoded SHA1 of the infodict"""
return self.info.v1_infohash
@property
def v2_infohash(self) -> str | None:
"""hex-encoded SHA256 of the infodict"""
return self.info.v2_infohash
@property
def n_files(self) -> int:
"""
Total number of files described by the torrent, excluding padfiles
"""
if self.torrent_version in (TorrentVersion.v1, TorrentVersion.hybrid):
self.info = cast(InfoDictV1 | InfoDictHybrid, self.info)
if self.info.files is None:
return 1
return len([f for f in self.info.files if f.attr not in (b"p", "p")])
else:
self.info = cast(InfoDictV2, self.info)
tree = FileTree.flatten_tree(self.info.file_tree)
return len(tree)
@property
def total_size(self) -> int:
"""
Total size of the torrent, excluding padfiles, in bytes
"""
if self.torrent_version in (TorrentVersion.v1, TorrentVersion.hybrid):
self.info = cast(InfoDictV1 | InfoDictHybrid, self.info)
if self.info.files is None:
self.info.length = cast(int, self.info.length)
return self.info.length
return sum([f.length for f in self.info.files if f.attr not in (b"p", "p")])
else:
self.info = cast(InfoDictV2, self.info)
tree = FileTree.flatten_tree(self.info.file_tree)
return sum([t["length"] for t in tree.values()])
@property
def flat_files(self) -> dict[str, FileTreeItem] | None:
"""A flattened version of the v2 file tree"""
if self._flat_files is None and self.torrent_version != TorrentVersion.v1:
self.info = cast(InfoDictV2, self.info)
self._flat_files = FileTree.flatten_tree(self.info.file_tree)
return self._flat_files
@property
def files(self) -> list[GenericFileItem]:
"""
Common access to file information from both v1 and v2 torrents
"""
if self._files is None:
# v1 and v2 reps already confirmed to be equivalent during validation
files = []
if self.torrent_version in (TorrentVersion.v1, TorrentVersion.hybrid):
self.info = cast(InfoDictV1 | InfoDictHybrid, self.info)
if self.info.files is None:
v1_files = [FileItem(length=self.info.length, path=[self.info.name])]
else:
v1_files = self.info.files
for f in v1_files:
if f.is_padfile:
continue
v1_repr = f.model_dump()
v1_repr["path"] = posixpath.join(*v1_repr["path"])
if isinstance(v1_repr["path"], bytes):
v1_repr["path"] = v1_repr["path"].decode("utf-8")
if self.torrent_version == TorrentVersion.hybrid:
v2_repr = self.flat_files[v1_repr["path"]] # type: ignore
else:
v2_repr = {}
files.append(GenericFileItem(**{**v1_repr, **v2_repr}))
else:
files = [GenericFileItem(path=k, **v) for k, v in self.flat_files.items()] # type: ignore
self._files = files
return self._files
@property
def flat_trackers(self) -> list[list[str]]:
trackers = []
if self.announce:
trackers.append([self.announce])
if self.announce_list:
trackers.extend(self.announce_list)
return trackers
[docs]
def model_dump_torrent(self, mode: L["str", "binary"] = "str", **kwargs: Any) -> dict:
"""
Dump the model into a dictionary that can be bencoded into a torrent
Args:
mode ("str", "binary"): ``str`` returns as a 'python' version of the torrent,
with string keys and serializers applied.
``binary`` roundtrips to and from bencoding.
kwargs: forwarded to :meth:`pydantic.BaseModel.model_dump`
"""
dumped = self.model_dump(exclude_none=True, by_alias=True, **kwargs)
if mode == "binary":
dumped = bencode_rs.bdecode(bencode_rs.bencode(dumped))
return dumped
[docs]
def pprint(self, verbose: int = 0) -> None:
"""
Pretty print the torrent.
See :func:`.pprint`
"""
pprint(self, verbose=verbose)
[docs]
class Torrent(TorrentBase):
"""
A valid torrent file, including hashes.
"""
@property
def file_size(self) -> int:
"""Size of the generated torrent file, in bytes"""
return len(self.bencode())
[docs]
def bencode(self) -> bytes:
dumped = self.model_dump_torrent(mode="str")
return bencode_rs.bencode(dumped)
[docs]
def write(self, path: Path) -> None:
"""Write the torrent to disk"""
with open(path, "wb") as f:
f.write(self.bencode())
[docs]
def v1_piece_range(self, piece_idx: int) -> V1PieceRange:
"""Get a v1 piece range from the piece index"""
assert self.torrent_version in (
TorrentVersion.v1,
TorrentVersion.hybrid,
), "Cannot get v1 piece ranges for v2-only torrents"
self.info = cast(InfoDictV1 | InfoDictHybrid, self.info)
if piece_idx >= len(self.info.pieces):
raise IndexError(
f"Cannot get piece index {piece_idx} for torrent with "
f"{len(self.info.pieces)} pieces"
)
start_range = piece_idx * self.info.piece_length
end_range = (piece_idx + 1) * self.info.piece_length
if self.info.files is None:
self.info.length = cast(int, self.info.length)
# single file torrent
return V1PieceRange(
piece_idx=piece_idx,
piece_hash=self.info.pieces[piece_idx],
ranges=[
FileItemRange(
path=[self.info.name],
length=self.info.length,
range_start=start_range,
range_end=min(self.info.length, end_range),
full_path=self.info.name,
)
],
)
size_idx = 0
file_idx = 0
found_len = 0
ranges = []
# first, find file where range starts
# could probably be combined with the second step,
# but just getting this working before worrying about aesthetics
for i, file in enumerate(self.info.files):
if file.length + size_idx > start_range:
# range starts in this file
# create the range from the first file
file_range_start = start_range - size_idx
file_range_end = min(file.length, file_range_start + self.info.piece_length)
found_len += file_range_end - file_range_start
ranges.append(
FileItemRange(
path=file.path,
attr=file.attr,
length=file.length,
range_start=file_range_start,
range_end=file_range_end,
full_path="/".join([self.info.name, *file.path]),
)
)
# index additional files starting at the next file
file_idx = i + 1
break
else:
size_idx += file.length
# then, iterate through files until the range or files are exhausted
while found_len < self.info.piece_length and file_idx < len(self.info.files):
file = self.info.files[file_idx]
file_range_start = 0
file_range_end = min(file.length, self.info.piece_length - found_len)
ranges.append(
FileItemRange(
path=file.path,
attr=file.attr,
length=file.length,
range_start=file_range_start,
range_end=file_range_end,
full_path="/".join([self.info.name, *file.path]),
)
)
found_len += file_range_end - file_range_start
file_idx += 1
return V1PieceRange(
piece_idx=piece_idx, ranges=ranges, piece_hash=self.info.pieces[piece_idx]
)
[docs]
def v2_piece_range(self, file: str, piece_idx: int = 0) -> V2PieceRange:
"""
Get a v2 piece range from a file path and optional piece index.
If `piece_idx` is not provided (default to 0)...
- If the file is larger than the piece length, gets the 0th piece.
- If the file is smaller than the piece length,
the range corresponds to the whole file, the hash is the root hash,
and piece_idx is ignored.
"""
assert self.torrent_version in (
TorrentVersion.v2,
TorrentVersion.hybrid,
), "Cannot get v2 piece ranges from a v1-only torrent"
# satisfy mypy...
self.info = cast(InfoDictV2 | InfoDictHybrid, self.info)
flat_files = self.flat_files
flat_files = cast(dict[str, FileTreeItem], flat_files)
self.piece_layers = cast(PieceLayersType, self.piece_layers)
if file not in flat_files:
raise ValueError(f"file {file} not found in torrent!")
root = flat_files[file]["pieces root"]
full_path = file if len(flat_files) == 1 else "/".join([self.info.name, file])
if root not in self.piece_layers:
# smaller then piece_length, piece range is whole file
return V2PieceRange(
piece_idx=0,
path=file,
range_start=0,
range_end=flat_files[file]["length"],
piece_length=self.info.piece_length,
file_size=flat_files[file]["length"],
root_hash=root,
full_path=full_path,
)
else:
if piece_idx >= len(self.piece_layers[root]):
raise IndexError(
f"piece index {piece_idx} is out of range for file with "
f"{len(self.piece_layers[root])} pieces"
)
return V2PieceRange(
piece_idx=piece_idx,
path=file,
range_start=piece_idx * self.info.piece_length,
range_end=min(flat_files[file]["length"], (piece_idx + 1) * self.info.piece_length),
piece_length=self.info.piece_length,
file_size=flat_files[file]["length"],
piece_hash=self.piece_layers[root][piece_idx],
root_hash=root,
full_path=full_path,
)
[docs]
@model_validator(mode="after")
def piece_layers_if_v2(self) -> Self:
"""If we are a v2 or hybrid torrent, we should have piece layers"""
if self.torrent_version in (TorrentVersion.v2, TorrentVersion.hybrid):
assert self.piece_layers is not None, "Hybrid and v2 torrents must have piece layers"
return self
[docs]
@model_validator(mode="after")
def pieces_layers_correct(self) -> Self:
"""
All files with a length longer than the piece length should be in piece layers,
Piece layers should have the correct number of hashes
"""
if self.torrent_version == TorrentVersion.v1:
return self
self.piece_layers = cast(PieceLayersType, self.piece_layers)
self.info = cast(InfoDictV2 | InfoDictHybrid, self.info)
for path, file_info in self.info.flat_tree.items():
if file_info["length"] > self.info.piece_length:
assert file_info["pieces root"] in self.piece_layers, (
f"file {path} does not have a matching piece root in the piece layers dict. "
f"Expected to find: {file_info['pieces root']}" # type: ignore
)
expected_pieces = ceil(file_info["length"] / self.info.piece_length)
assert len(self.piece_layers[file_info["pieces root"]]) == expected_pieces, (
f"File {path} does not have the correct number of piece hashes. "
f"Expected {expected_pieces} hashes from file length {file_info['length']} "
f"and piece length {self.info.piece_length}. "
f"Got {len(self.piece_layers[file_info['pieces root']])}"
)
return self
[docs]
def pprint(t: TorrentBase, verbose: int = 0) -> None:
"""
Print the contents of a torrent file.
By default, prints only the top-level metadata in a way that should always be
smaller than one screen.
Increase verbosity to show more of the torrent.
Hashes are printed as hexadecimal numbers and split into individual pieces,
but they are properly encoded in the torrent.
Args:
t (:class:`.Torrent`): The torrent to print.
verbose (int): Level of detail to print.
* ``1`` show files in separate table
* ``2`` show truncated v1 piece hashes
* ``3`` show everything as-is
"""
# summary stats
summary = {
"# Files": humanize.number.intcomma(t.n_files),
"Total Size": humanize.naturalsize(t.total_size, binary=True),
"Piece Size": humanize.naturalsize(t.info.piece_length, binary=True),
}
if hasattr(t, "file_size"):
summary["Torrent Size"] = humanize.naturalsize(t.file_size, binary=True)
v1_infohash = t.v1_infohash
v2_infohash = t.v2_infohash
if v1_infohash:
summary["V1 Infohash"] = v1_infohash
if v2_infohash:
summary["V2 Infohash"] = v2_infohash
table = Table(title=t.info.name, show_header=False)
table.add_column("", justify="left", style="magenta bold", no_wrap=True)
table.add_column("")
for k, v in summary.items():
table.add_row(k, v)
exclude = {}
context = {"mode": "print", "hash_truncate": True}
file_table = None
if verbose <= 1:
exclude = {"info": {"pieces", "file tree", "file_tree", "files"}, "piece_layers": True}
elif verbose <= 2:
exclude = {"info": {"file tree", "file_tree", "files"}, "piece_layers": True}
else:
context["hash_truncate"] = False
# make file table
if 1 <= verbose <= 2:
file_table = Table(title="Files")
file_table.add_column("Path", no_wrap=True)
file_table.add_column("Size")
if t.torrent_version == TorrentVersion.v1:
t.info = cast(InfoDictV1, t.info)
tfiles = (
t.info.files
if t.info.files is not None
else [FileItem(path=t.info.name, length=t.info.length)]
)
files = [
("/".join(f.path), humanize.naturalsize(f.length, binary=True), "")
for f in tfiles
if f.attr not in (b"p", "p")
]
else:
t.info = cast(InfoDictV2 | InfoDictHybrid, t.info)
file_table.add_column("Hash")
tree = t.flat_files
assert tree is not None
files = [
(
str(k),
humanize.naturalsize(v["length"], binary=True),
v["pieces root"].hex()[0:8],
)
for k, v in tree.items()
]
for f in files:
file_table.add_row(*f)
dumped = t.model_dump(
by_alias=True, exclude=exclude, exclude_none=True, context=context # type: ignore
)
if verbose < 1 or verbose > 2:
group = Group(
table,
Pretty(dumped),
)
elif verbose <= 2:
assert file_table is not None
group = Group(table, file_table, Pretty(dumped))
print(group)