"""Data model using :obj:`"Real" GUID's <.types.REAL_GUID>` as identifiers
This may eventually replace the :mod:`granicus_archiver.legistar.model` module
since its storage method is more robust.
For now however, the two exist in parallel.
"""
from __future__ import annotations
from typing import Literal, Iterator, NamedTuple, Self, Any, overload, TYPE_CHECKING
from os import PathLike
from pathlib import Path
import json
from dataclasses import dataclass, field
if TYPE_CHECKING:
from ..config import Config
from ..types import Serializable, FileMeta
from ..clips.model import CLIP_ID
from .types import (
REAL_GUID, LegistarFileKey, AttachmentName, LegistarFileUID,
NoClipT, NoClip,
)
from .model import (
LegistarFile as _LegistarFile,
LegistarFiles,
AttachmentFile as _AttachmentFile,
DetailPageResult,
AbstractLegistarModel,
is_attachment_uid,
uid_to_attachment_name,
attachment_name_to_uid,
uid_to_file_key,
file_key_to_uid,
)
from .rss_parser import FeedItem, GuidCompare
from ..utils import HashMismatchError, get_file_hash
[docs]
@dataclass
class LegistarFile(_LegistarFile):
"""Information for a downloaded file within :attr:`RGuidLegistarFiles.files`
"""
@property
def uid(self) -> LegistarFileUID:
"""Unique id for the file
"""
return file_key_to_uid(self.name)
[docs]
@classmethod
def from_uid(
cls,
uid: LegistarFileUID,
filename: Path,
metadata: FileMeta,
pdf_links_removed: bool
) -> Self:
"""Create an instance from a :obj:`uid <.types.LegistarFileUID>`
"""
key = uid_to_file_key(uid)
return cls(
name=key, filename=filename, metadata=metadata,
pdf_links_removed=pdf_links_removed,
)
[docs]
@dataclass
class AttachmentFile(_AttachmentFile):
"""Information for a downloaded attachment within :attr:`RGuidLegistarFiles.files`
"""
@property
def uid(self) -> LegistarFileUID:
"""Unique id for the file
"""
return attachment_name_to_uid(self.name)
[docs]
@classmethod
def from_uid(
cls,
uid: LegistarFileUID,
filename: Path,
metadata: FileMeta,
pdf_links_removed: bool
) -> Self:
"""Create an instance from a :obj:`uid <.types.LegistarFileUID>`
"""
assert is_attachment_uid(uid)
key = uid_to_attachment_name(uid)
return cls(
name=key, filename=filename, metadata=metadata,
pdf_links_removed=pdf_links_removed,
)
def get_file_cls(uid: LegistarFileUID) -> type[LegistarFile|AttachmentFile]:
if is_attachment_uid(uid):
return AttachmentFile
return LegistarFile
[docs]
class RGuidUpdateResult(NamedTuple):
"""
"""
changed: bool
"""Whether any changes were made"""
link_keys: list[LegistarFileKey]
"""Any URL attributes from :class:`DetailPageLinks` that changed"""
attachment_keys: list[AttachmentName]
"""Any keys in :attr:`DetailPageLinks.attachments` that changed"""
files: dict[LegistarFileUID, LegistarFile|AttachmentFile]
attributes: dict[str, Any]
"""Attributes of :class:`DetailPageResult` that changed"""
[docs]
@dataclass
class RGuidLegistarFiles(Serializable):
"""Collection of files within a :class:`RGuidDetailResult`
"""
parent: RGuidDetailResult = field(init=False, repr=False, compare=False)
base_dir: Path
"""Base directory (relative to :attr:`RGuidLegistarData.root_dir`)"""
files: dict[LegistarFileUID, LegistarFile|AttachmentFile] = field(default_factory=dict)
"""Mapping of :class:`LegistarFile` or :class:`AttachmentFile` objects
using their :attr:`~LegistarFile.uid` as keys
"""
@classmethod
def build_filename(cls, uid: LegistarFileUID) -> Path:
if is_attachment_uid(uid):
name = uid_to_attachment_name(uid)
return cls.build_attachment_filename(name)
# return Path('attachments') / name
key = uid_to_file_key(uid)
return LegistarFiles.build_filename(key)
@classmethod
def build_attachment_filename(cls, name: AttachmentName) -> Path:
return Path('attachments') / f'{name}.pdf'
@property
def full_base_dir(self) -> Path:
"""The complete file directory (:attr:`base_dir` prefixed with
:attr:`RGuidLegistarData.root_dir`)
"""
return self.parent.parent.root_dir / self.base_dir
[docs]
def get_file_path(self, uid: LegistarFileUID, absolute: bool) -> Path:
"""Get the filename for the given *uid*
Arguments:
uid: A :obj:`LegistarFileUID`
absolute: If ``True`` the path will be within the :attr:`full_base_dir`,
otherwise :attr:`base_dir` is used
"""
if uid in self:
filename = self[uid].filename
else:
filename = self.build_filename(uid)
if absolute:
return self.full_base_dir / filename
return self.base_dir / filename
[docs]
def add_file(
self,
uid: LegistarFileUID,
meta: FileMeta,
pdf_links_removed: bool
) -> LegistarFile|AttachmentFile:
"""Add a file with the given *uid*
"""
if uid in self:
raise KeyError(f'uid "{uid}" already exists')
filename = self.build_filename(uid)
f_cls = get_file_cls(uid)
f_obj = f_cls.from_uid(uid, filename, meta, pdf_links_removed)
self.files[uid] = f_obj
return f_obj
[docs]
def ensure_local_hashes(self, check_existing: bool = False) -> bool:
"""Ensure that all local files have an :attr:`~.types.FileMeta.sha1` hash
stored in their :attr:`~AbstractFile.metadata`
Arguments:
check_existing: If ``True``, the hash of the local file will be
checked against the stored hash
Returns:
``True`` if any hashes were generated or updated
"""
changed = False
for f in self:
full_p = self.get_file_path(f.uid, absolute=True)
if f.metadata.sha1 is None:
f.metadata.sha1 = get_file_hash('sha1', full_p)
changed = True
elif check_existing:
if f.metadata.sha1 != get_file_hash('sha1', full_p):
raise HashMismatchError(full_p)
return changed
def iter_legistar_files(self) -> Iterator[LegistarFile]:
for obj in self:
if isinstance(obj, LegistarFile):
yield obj
def iter_attachments(self) -> Iterator[AttachmentFile]:
for obj in self:
if isinstance(obj, AttachmentFile):
yield obj
def keys(self) -> Iterator[LegistarFileUID]:
yield from self.files.keys()
def items(self) -> Iterator[tuple[LegistarFileUID, LegistarFile|AttachmentFile]]:
yield from self.files.items()
def __iter__(self) -> Iterator[LegistarFile|AttachmentFile]:
yield from self.files.values()
def __contains__(self, key: LegistarFileUID) -> bool:
return key in self.files
def get(self, key: LegistarFileUID) -> LegistarFile|AttachmentFile|None:
return self.files.get(key)
def __getitem__(self, key: LegistarFileUID) -> LegistarFile|AttachmentFile:
return self.files[key]
def __setitem__(self, key: LegistarFileUID, item: LegistarFile|AttachmentFile) -> None:
self.files[key] = item
def __delitem__(self, key: LegistarFileUID) -> None:
del self.files[key]
@classmethod
def deserialize(cls, data: dict[str, Any], parent: RGuidDetailResult|None = None) -> Self:
obj = cls(
base_dir=Path(data['base_dir']),
files={
k:get_file_cls(k).deserialize(v) for k,v in data['files'].items()
},
)
if parent is not None:
obj.parent = parent
return obj
def serialize(self) -> dict[str, Any]:
return dict(
base_dir=str(self.base_dir),
files={k:v.serialize() for k,v in self.files.items()},
)
[docs]
@dataclass
class RGuidDetailResult(DetailPageResult):
"""Subclass of :class:`.model.DetailPageResult` for this module
"""
parent: RGuidLegistarData = field(init=False, repr=False)
files: RGuidLegistarFiles = field(init=False)
"""Instance of :class:`RGuidLegistarFiles`"""
[docs]
@classmethod
def from_html(cls, html_str: str|bytes, feed_item: FeedItem, parent: RGuidLegistarData) -> Self:
"""Create an instance from the raw html from
:attr:`~.model.DetailPageResult.page_url`
"""
obj = super().from_html(html_str, feed_item)
obj.parent = parent
obj.files = RGuidLegistarFiles(base_dir=obj.get_unique_folder())
obj.files.parent = obj
return obj
@property
def guid_compare(self) -> GuidCompare:
"""A helper to compare :obj:`GUID's <.types.GUID>`
"""
return GuidCompare(self.feed_guid)
[docs]
def get_unique_folder(self) -> Path:
"""Get a local path to store files for this item
The folder structure will be:
``<category>/<year>/<real_guid>``
Where
``<category>``
Is the :attr:`~.rss_parser.FeedItem.category` of the :attr:`feed_item`
``<year>``
Is the 4-digit year of the :attr:`~.rss_parser.FeedItem.meeting_date`
``<real_guid>``
Is the :attr:`~.model.DetailPageResult.real_guid`
This makes it much less complex to ensure uniqueness compared to
:meth:`.model.DetailPageResult.get_unique_folder`, but has a downside
of being less user-friendly in a file browser (without the metadata).
"""
p = super().get_unique_folder()
return p.parent / self.real_guid
[docs]
def copy(self) -> Self:
"""Create a deep copy of the instance
"""
return self.deserialize(self.serialize(), parent=self.parent)
[docs]
def update(self, other: Self) -> RGuidUpdateResult:
"""Update *self* with changed attributes in *other*
"""
assert self.real_guid == other.real_guid
assert other.guid_compare >= self.guid_compare
r = super().update(other)
new_uids = set(other.files.keys()) - set(self.files.keys())
files_changed: dict[LegistarFileUID, LegistarFile|AttachmentFile] = {}
files_changed.update({uid:other.files[uid] for uid in new_uids})
changed = r.changed
if len(files_changed):
changed = True
return RGuidUpdateResult(
changed=changed,
link_keys=r.link_keys,
attachment_keys=r.attachment_keys,
attributes=r.attributes if r.attributes else {},
files=files_changed,
)
@classmethod
def deserialize(cls, data: dict[str, Any], parent: RGuidLegistarData|None = None) -> Self:
data = data.copy()
file_data = data.pop('files')
obj = super().deserialize(data)
obj.files = RGuidLegistarFiles.deserialize(file_data, parent=obj)
if parent is not None:
obj.parent = parent
return obj
def serialize(self) -> dict[str, Any]:
d = super().serialize()
assert self.files is not None
d['files'] = self.files.serialize()
return d
[docs]
@dataclass
class RGuidLegistarData(AbstractLegistarModel[REAL_GUID, RGuidDetailResult]):
"""Container for data gathered from Legistar, using :obj:`real GUID's <.types.REAL_GUID>`
as keys
"""
root_dir: Path
"""Root filesystem path for downloading assets"""
detail_results: dict[REAL_GUID, RGuidDetailResult] = field(default_factory=dict)
"""Mapping of parsed :class:`RGuidDetailResult` items with their
:attr:`~.model.DetailPageResult.feed_guid` as keys
"""
matched_guids: dict[CLIP_ID, REAL_GUID] = field(default_factory=dict)
""":attr:`Clips <.model.Clip.id>` that have been matched to
:attr:`FeedItems <.rss_parser.FeedItem.guid>`
"""
items_by_clip_id: dict[CLIP_ID, RGuidDetailResult] = field(default_factory=dict)
"""Mapping of items in :attr:`detail_results` with a valid
:attr:`~.model.DetailPageResult.clip_id`
"""
clip_id_overrides: dict[REAL_GUID, CLIP_ID|NoClipT] = field(default_factory=dict)
"""Mapping of items manually-linked to :class:`Clips <.model.Clip>`
"""
clip_id_overrides_rev: dict[CLIP_ID, REAL_GUID] = field(init=False)
def __post_init__(self) -> None:
for item in self.detail_results.values():
item.parent = self
self.clip_id_overrides_rev = {
v: k for k, v in self.clip_id_overrides.items() if v is not NoClip
}
for item in self:
clip_id = item.clip_id
if clip_id is None:
continue
assert clip_id not in self.items_by_clip_id
self.items_by_clip_id[clip_id] = item
@classmethod
def _get_root_dir(cls, config: Config) -> Path:
# TODO: This should be stored in Config
return config.legistar.out_dir.parent / 'legistar-rguid'
@classmethod
def _get_data_file(cls, config: Config) -> Path:
# TODO: This should be stored in Config
return cls._get_root_dir(config) / 'data.json'
def get_guid_for_detail_result(self, item: RGuidDetailResult) -> REAL_GUID:
return item.real_guid
[docs]
def get_clip_id_for_guid(
self,
guid: REAL_GUID,
use_overrides: bool = True
) -> CLIP_ID|None|NoClipT:
"""Get the clip :attr:`~.model.Clip.id` linked to the given *guid*
Arguments:
guid: The item :attr:`~.model.DetailPageResult.feed_guid`
use_overrides: Whether to use items in :attr:`clip_id_overrides`
(default is ``True``)
Returns one of:
- clip_id (:obj:`~.model.CLIP_ID`)
The matched :attr:`Clip.id <.model.Clip.id>` (if one was found)
- :obj:`~.types.NoClip`
If the item has been explicitly set to have no :class:`~.model.Clip`
associated with it
- :obj:`None`
If no match was found
"""
item = self[guid]
if use_overrides and item.real_guid in self.clip_id_overrides:
return self.clip_id_overrides[item.real_guid]
if item.clip_id is not None:
return item.clip_id
matched_rev = {v:k for k,v in self.matched_guids.items()}
assert len(matched_rev) == len(self.matched_guids)
return matched_rev.get(guid)
def ensure_no_future_items(self) -> None:
for item in self:
if item.is_future:
raise ValueError(f'item is in the future: {item.feed_guid=}')
def ensure_unique_item_folders(self) -> None:
item_paths = [
self.get_folder_for_item(item) for item in self if item.can_download
]
s = set()
for p in item_paths:
if p in s:
raise ValueError(f'folder collision for "{p}"')
s.add(p)
assert len(item_paths) == len(set(item_paths))
def _build_item(self, html_str: str|bytes, feed_item: FeedItem) -> RGuidDetailResult:
return RGuidDetailResult.from_html(html_str, feed_item, parent=self)
@overload
def create_item(
self,
html_str: str|bytes,
feed_item: FeedItem,
allow_update: Literal[True]
) -> tuple[RGuidDetailResult, RGuidUpdateResult|None]: ...
@overload
def create_item(
self,
html_str: str|bytes,
feed_item: FeedItem,
allow_update: Literal[False]
) -> RGuidDetailResult: ...
[docs]
def create_item(
self,
html_str: str|bytes,
feed_item: FeedItem,
allow_update: bool
) -> tuple[RGuidDetailResult, RGuidUpdateResult|None]|RGuidDetailResult:
"""Create and add an item from html
Arguments:
html_str: The raw html to pass to :meth:`RGuidDetailResult.from_html`
feed_item: The :class:`.rss_parser.FeedItem`
allow_update: If an item exists and this is ``True``, its
:meth:`~RGuidDetailResult.update` method will be called.
Otherwise, a :class:`KeyError` will be raised.
:Returns:
- If *allow_update* is ``False`` this returns
- **item** (:class:`RGuidDetailResult`): The parsed item
- If *allow_update* is ``True`` this returns a :class:`tuple` of
- **item** (:class:`RGuidDetailResult`): The parsed item
- **update_result**: The :class:`RGuidUpdateResult` if an item
was updated (or ``None`` if no update was performed).
"""
new_item = self._build_item(html_str, feed_item)
update = None
if new_item in self:
item = self[new_item.real_guid]
if allow_update:
update = item.update(item)
return item, update
elif item.guid_compare >= new_item.guid_compare:
return item
raise KeyError(f'Item exists: {new_item.real_guid}')
else:
item = new_item
self.add_item(item)
if allow_update:
return item, None
return item
[docs]
def add_item(self, item: RGuidDetailResult) -> None:
"""Add an existing :class:`RGuidDetailResult` object
"""
if item in self:
if item is self[item.real_guid]:
return
raise KeyError(f'Item exists: {item.real_guid}')
assert item.parent is self
self.detail_results[item.real_guid] = item
clip_id = item.links.get_clip_id_from_video()
if clip_id is not None:
self.items_by_clip_id[clip_id] = item
[docs]
def add_detail_result(self, item: RGuidDetailResult) -> None:
"""Add a parsed :class:`RGuidDetailResult` to :attr:`detail_results`
"""
return self.add_item(item)
[docs]
def find_match_for_clip_id(self, clip_id: CLIP_ID) -> RGuidDetailResult|None|NoClipT:
"""Find a :class:`RGuidDetailResult` match for the given *clip_id*
"""
if clip_id in self.clip_id_overrides_rev:
guid = self.clip_id_overrides_rev[clip_id]
return self.get(guid)
item = self.items_by_clip_id.get(clip_id)
if item is not None and item.real_guid in self.clip_id_overrides:
_clip_id = self.clip_id_overrides[item.real_guid]
if _clip_id is NoClip:
return NoClip
return item
[docs]
def is_clip_id_available(self, clip_id: CLIP_ID) -> bool:
"""Check whether the given clip id is linked to an item (returns ``True``
if there is no link)
"""
if clip_id in self.clip_id_overrides_rev:
return False
if clip_id in self.matched_guids:
return False
return True
[docs]
def is_guid_matched(self, guid: REAL_GUID) -> bool:
"""Check whether the item matching *guid* has a :class:`~.model.Clip`
associated with it
"""
item = self[guid]
override = self.clip_id_overrides.get(item.real_guid)
if override is NoClip:
return True
if override is not None:
return True
if guid in self.matched_guids.values():
return True
return False
[docs]
def add_guid_match(self, clip_id: CLIP_ID, guid: REAL_GUID) -> None:
"""Add a ``Clip.id -> FeedItem`` match to :attr:`matched_guids`
This may seem redunant considering the :meth:`find_match_for_clip_id`
method, but is intended for adding matches for items without a
:attr:`~.model.DetailPageLinks.video` url to parse.
"""
assert guid not in self.matched_guids.values()
if clip_id in self.matched_guids:
assert self.matched_guids[clip_id] == guid
return
self.matched_guids[clip_id] = guid
[docs]
def add_clip_match_override(
self,
real_guid: REAL_GUID,
clip_id: CLIP_ID|None|NoClipT
) -> None:
"""Add a manual override for the given *real_guid*
Arguments:
real_guid: The :attr:`~.model.DetailPageResult.real_guid` of the
legistar item
clip_id: The clip :attr:`.model.Clip.id` matching the item. If
:obj:`~.types.NoClip` is given, this signifies that
the item should not have a :class:`~.model.Clip` associated
with it. If :obj:`None` is given, any previously added overrides
for *real_guid* will be removed.
"""
if clip_id is None:
if real_guid not in self.clip_id_overrides:
return
real_clip_id = self.clip_id_overrides[real_guid]
if real_clip_id is not NoClip:
assert self.clip_id_overrides_rev[real_clip_id] == real_guid
del self.clip_id_overrides_rev[real_clip_id]
del self.clip_id_overrides[real_guid]
return
if clip_id is not NoClip:
assert clip_id not in self.clip_id_overrides_rev
self.clip_id_overrides_rev[clip_id] = real_guid
self.clip_id_overrides[real_guid] = clip_id
[docs]
def iter_guid_matches(self) -> Iterator[tuple[CLIP_ID, RGuidDetailResult]]:
"""Iterate over items added by the :meth:`add_guid_match`,
:meth:`add_guid_match` and :meth:`add_clip_match_override` methods
Results are tuples of :obj:`CLIP_ID` and :class:`RGuidDetailResult`
"""
real_guids = set[REAL_GUID]()
for clip_id, guid in self.matched_guids.items():
item = self[guid]
if item.real_guid in self.clip_id_overrides:
continue
real_guids.add(item.real_guid)
yield clip_id, item
for real_guid, clip_id in self.clip_id_overrides.items():
if clip_id is NoClip:
continue
assert real_guid not in real_guids
item = self[real_guid]
assert item is not None
yield clip_id, item
[docs]
def get_folder_for_item(self, item: REAL_GUID|RGuidDetailResult) -> Path:
"""Get a local path to store files for a :class:`RGuidDetailResult`
This is the result of :attr:`RGuidLegistarFiles.full_base_dir` from
:attr:`RGuidDetailResult.files`
"""
if not isinstance(item, RGuidDetailResult):
item = self[item]
return item.files.full_base_dir
def get_path_for_uid(self, guid: REAL_GUID, uid: LegistarFileUID) -> tuple[Path, FileMeta|None]:
item = self.detail_results[guid]
p = item.files.get_file_path(uid, absolute=True)
fobj = item.files.get(uid)
meta = None if fobj is None else fobj.metadata
return p, meta
def iter_files_for_upload(self, guid: REAL_GUID) -> Iterator[tuple[LegistarFileUID, Path, FileMeta, bool]]:
item = self.detail_results[guid]
for fobj in item.files:
p = item.files.get_file_path(fobj.uid, absolute=True)
is_attachment = isinstance(fobj, AttachmentFile)
yield fobj.uid, p, fobj.metadata, is_attachment
def get(self, key: REAL_GUID) -> RGuidDetailResult|None:
return self.detail_results.get(key)
def __getitem__(self, key: REAL_GUID) -> RGuidDetailResult:
return self.detail_results[key]
def __contains__(self, key: REAL_GUID|RGuidDetailResult) -> bool:
if isinstance(key, RGuidDetailResult):
key = key.real_guid
return key in self.detail_results
def __iter__(self) -> Iterator[RGuidDetailResult]:
yield from self.detail_results.values()
def __len__(self) -> int:
return len(self.detail_results)
def keys(self) -> Iterator[REAL_GUID]:
yield from self.detail_results.keys()
def items(self) -> Iterator[tuple[REAL_GUID, RGuidDetailResult]]:
yield from self.detail_results.items()
def item_dict(self) -> dict[REAL_GUID, RGuidDetailResult]:
return self.detail_results
[docs]
@classmethod
def load(
cls,
filename: PathLike,
root_dir: Path|None = None,
) -> Self:
"""Loads an instance from previously saved data
"""
if not isinstance(filename, Path):
filename = Path(filename)
data = json.loads(filename.read_text())
if root_dir is not None:
assert not root_dir.is_absolute()
data_root = data.get('root_dir')
if data_root is not None:
assert Path(data_root) == root_dir
else:
data['root_dir'] = str(root_dir)
return cls.deserialize(data)
[docs]
def save(self, filename: PathLike, indent: int|None = 2) -> None:
"""Saves all clip data as JSON to the given filename
"""
if not isinstance(filename, Path):
filename = Path(filename)
data = self.serialize()
filename.write_text(json.dumps(data, indent=indent))
@classmethod
def deserialize(cls, data: dict[str, Any]) -> Self:
d: dict[REAL_GUID, CLIP_ID|Literal['NoClip']] = data.get('clip_id_overrides', {})
overrides: dict[REAL_GUID, CLIP_ID|NoClipT] = {}
for key, val in d.items():
if val == 'NoClip':
val = NoClip
overrides[key] = val
return cls(
root_dir=Path(data['root_dir']),
detail_results={
k:RGuidDetailResult.deserialize(v) for
k,v in data['detail_results'].items()
},
matched_guids=data.get('matched_guids', {}),
clip_id_overrides=overrides,
)
def serialize(self) -> dict[str, Any]:
overrides: dict[REAL_GUID, CLIP_ID|Literal['NoClip']] = {}
for key, val in self.clip_id_overrides.items():
if val is NoClip:
val = 'NoClip'
overrides[key] = val
return dict(
root_dir=str(self.root_dir),
detail_results={k:v.serialize() for k,v in self.detail_results.items()},
matched_guids=self.matched_guids,
clip_id_overrides=overrides,
)