Source code for granicus_archiver.web.views

from __future__ import annotations
from typing import (
    TypeVar, Generic, Literal, ClassVar, TypedDict, TypeIs,
    Iterable, Sequence, cast, get_args,
)

from abc import ABC, abstractmethod
import datetime
import dataclasses
from functools import wraps

from loguru import logger
from aiohttp import web
from multidict import MultiDict
import aiohttp_jinja2
from yarl import URL

from ..config import Config
from ..clips.model import CLIP_ID, Location, Clip, ClipCollection
from ..legistar.model import (
    LegistarData, DetailPageResult, LegistarFiles,
    AbstractLegistarModel, AbstractFile, LegistarFile, AttachmentFile,
    file_key_to_uid, attachment_name_to_uid,
)
from ..legistar.rss_parser import is_guid, is_real_guid
from ..legistar.guid_model import RGuidLegistarData, RGuidDetailResult
from ..legistar.search_indexing import (
    search_contents as legistar_search_contents,
    SearchResult as LegistarSearchResult,
)
from ..legistar.types import (
    Category, GUID, REAL_GUID, NoClip, NoClipT, LegistarFileUID,
    AgendaStatus, MinutesStatus, AgendaStatusItems, MinutesStatusItems,
)
from .types import (
    ConfigKey, ClipsKey, LegistarDataKey, RGuidLegistarDataKey,
    NavLinksKey, DataFileLockKey, SortOrder, NavLink, StaticRootName,
)
from .config import APP_CONF_KEY, ListFilterField
from .s3client import S3ClientKey
from .pagination import Paginator


[docs] class GlobalContext(TypedDict): """Context data for all templated views """ nav_links: Sequence[NavLink] """Navigation links""" page_title: str """Title of the page"""
ContextT = TypeVar('ContextT', bound=GlobalContext) """Type variable for template context data""" _ID_Type = TypeVar('_ID_Type', GUID, REAL_GUID, CLIP_ID) ClipIdOrNoneStr = CLIP_ID|Literal['None']|Literal['NoClip'] """Type variable for a clip id""" GuidOrNoneStr = GUID|Literal['None']|Literal['NoClip'] """Type variable for a guid""" RealGuidOrNoneStr = REAL_GUID|Literal['None']|Literal['NoClip'] routes = web.RouteTableDef()
[docs] def read_only_guard(func): """Decorator to prevent modification of data when in :attr:`~.config.AppConfig.read_only` mode """ @wraps(func) async def wrapper(*args, **kwargs): _self_or_request: web.View|web.Request = args[0] if isinstance(_self_or_request, web.View): request = _self_or_request.request elif isinstance(_self_or_request, web.Request): request = _self_or_request else: raise TypeError(f'invalid type: {type(_self_or_request)}') app_conf = request.app[APP_CONF_KEY] if app_conf.read_only: raise web.HTTPUnauthorized() return await func(*args, **kwargs) return wrapper
[docs] def with_data_file_lock(func): """Decorator to hold the :obj:`DataFileLock <.types.DataFileLockKey>` within wrapped the function """ @wraps(func) async def wrapper(*args, **kwargs): _self_or_request: web.View|web.Request = args[0] if isinstance(_self_or_request, web.View): request = _self_or_request.request elif isinstance(_self_or_request, web.Request): request = _self_or_request else: raise TypeError(f'invalid type: {type(_self_or_request)}') data_file_lock = request.app[DataFileLockKey] logger.debug(f'acquiring data file lock for {func}') async with data_file_lock: try: return await func(*args, **kwargs) finally: logger.debug(f'released data file lock for {func}') return wrapper
[docs] def is_truthy(value: str|bool|None) -> bool: """Check if the value is truthy """ if value is None: return False if isinstance(value, bool): return value value = value.lower() return value in ('true', '1', 'yes', 'on')
[docs] def parse_date(value: str|None) -> datetime.date|None: """Parse a date from a string """ if value is None: return None try: return datetime.datetime.strptime(value, '%Y-%m-%d').date() except ValueError: logger.debug(f'Invalid date format: {value}') return None
def clip_id_to_str(clip_id: CLIP_ID|Clip|None|NoClipT) -> ClipIdOrNoneStr: if clip_id is None: return 'None' if clip_id is NoClip: return 'NoClip' if isinstance(clip_id, Clip): clip_id = clip_id.id assert isinstance(clip_id, str) return clip_id def clip_id_from_str(value: ClipIdOrNoneStr) -> CLIP_ID|None|NoClipT: if value == 'None': return None if value == 'NoClip': return NoClip return CLIP_ID(value) def guid_to_str(guid: GUID|DetailPageResult|None|NoClipT) -> GuidOrNoneStr: if guid is None: return 'None' if guid is NoClip: return 'NoClip' if isinstance(guid, DetailPageResult): guid = guid.feed_guid return guid def guid_from_str(value: GuidOrNoneStr) -> GUID|None|NoClipT: if value == 'None': return None if value == 'NoClip': return NoClip return GUID(value) def real_guid_to_str(guid: REAL_GUID|RGuidDetailResult|None|NoClipT) -> RealGuidOrNoneStr: if guid is None: return 'None' if guid is NoClip: return 'NoClip' if isinstance(guid, RGuidDetailResult): guid = guid.real_guid return guid def real_guid_from_str(value: RealGuidOrNoneStr) -> REAL_GUID|None|NoClipT: if value == 'None': return None if value == 'NoClip': return NoClip return REAL_GUID(value) def id_equal(a: _ID_Type|None|NoClipT, b: _ID_Type|None|NoClipT) -> bool: if a is None or b is None: return a is None and b is None if a is NoClip or b is NoClip: return a is NoClip and b is NoClip return a == b
[docs] def is_clip_hidden(request: web.Request, clip_or_id: Clip|CLIP_ID|None|NoClipT) -> bool: """Check if a clip is hidden based on the app config """ if clip_or_id is None or clip_or_id is NoClip: return False app_conf = request.app[APP_CONF_KEY] if not isinstance(clip_or_id, Clip): clips = request.app[ClipsKey] try: clip = clips[clip_or_id] except KeyError: return False else: clip = clip_or_id return clip.location in app_conf.hidden_clip_categories
[docs] def check_clip_hidden( request: web.Request, clip_or_id: Clip|CLIP_ID ) -> None: """Check if a clip is hidden and raise a 404 if it is """ if is_clip_hidden(request, clip_or_id): raise web.HTTPNotFound()
[docs] @routes.get('/healthcheck/', name='healthcheck') async def healthcheck(request: web.Request) -> web.Response: """Health check endpoint """ return web.Response(text='OK')
[docs] @routes.get('/clips/webvtt/{clip_id}/', name='clip_webvtt') async def clip_webvtt(request: web.Request) -> web.Response|web.StreamResponse: """View to display a webvtt file for a clip """ app_conf = request.app[APP_CONF_KEY] clip_id = CLIP_ID(request.match_info['clip_id']) check_clip_hidden(request, clip_id) clips = request.app[ClipsKey] clip = clips[clip_id] if app_conf.use_s3: chunk_size = 4096 s3_client = request.app[S3ClientKey] vtt_filename = clip.get_file_path('chapters', absolute=False) s3_prefix = s3_client.data_dirs['clips'] s3_path = s3_prefix / vtt_filename # if not await s3_client.object_exists(str(s3_path)): # raise web.HTTPNotFound() obj = await s3_client.s3_client.get_object( Bucket=s3_client.bucket.name, Key=str(s3_path), ) obj_info = obj['ResponseMetadata']['HTTPHeaders'] resp = web.StreamResponse( headers=MultiDict( { "CONTENT-DISPOSITION": ( f"attachment; filename='{vtt_filename}'" ), "Content-Type": obj_info["content-type"], } ) ) resp.content_type = obj_info['content-type'] resp.content_length = int(obj_info['content-length']) resp.etag = obj_info['etag'].strip('"') resp.last_modified = obj_info['last-modified'] await resp.prepare(request) stream = obj['Body'] async for chunk in stream.iter_chunks(chunk_size): await resp.write(chunk) else: vtt_filename = clip.get_file_path('chapters', absolute=True) webvtt = vtt_filename.read_text() resp = web.Response( text=webvtt, content_type='text/vtt', ) return resp
[docs] class AbstractView(ABC, Generic[ContextT]): """Abstract base class for views that render templates """
[docs] @classmethod @abstractmethod def get_template_name(cls) -> str: """Get the template name for this view """ ...
[docs] @abstractmethod async def get_context_data(self) -> ContextT: """Get the context data for this view """ ...
[docs] @abstractmethod async def render_to_response(self, context: ContextT) -> web.Response: """Render the response for this view """ ...
[docs] class TemplatedView(web.View, Generic[ContextT], AbstractView[ContextT]): """Base class for views that render templates """ template_name: ClassVar[str]
[docs] def get_config(self) -> Config: """Get the :class:`granicus_archiver.config.Config` object """ return self.request.app[ConfigKey]
[docs] @classmethod def get_template_name(cls) -> str: return cls.template_name
[docs] async def render_to_response(self, context: ContextT) -> web.Response: """Render the templated response """ tmpl = self.get_template_name() return aiohttp_jinja2.render_template(tmpl, self.request, context)
[docs] @with_data_file_lock async def get(self) -> web.Response: """Handler for GET requests """ context = await self.get_context_data() return await self.render_to_response(context)
[docs] class ListFilterContext[CatT: (Location, Category)](TypedDict): """Context data for list views with filters """ view_unassigned: bool """Whether to view unassigned items""" all_categories: list[CatT] """All possible categories""" current_category: CatT|None """The current category""" filter_by_date: bool """Whether to filter by date""" start_date: datetime.date|None """The start date for filtering""" end_date: datetime.date|None """The end date for filtering""" hidden_clip_list_filters: Sequence[ListFilterField] """List of list filter fields to hide in the UI"""
[docs] class ClipListContext(GlobalContext): """Template context for :class:`ClipListView` """ clip_dict: dict[CLIP_ID, Clip] """Mapping of :class:`~.model.Clip` instances by their :attr:`~.model.Clip.id` """ clips: Iterable[Clip] """Iterable of :class:`~.model.Clip` instances""" clip_guids: dict[CLIP_ID, GuidOrNoneStr] """Mapping of clip ids to their associated legistar item guids. A value of ``None`` or ``NoClip`` indicates no associated legistar item. """ paginator: Paginator[tuple[CLIP_ID, Clip]] """:class:`~.pagination.Paginator` instance for :attr:`clip_dict`""" filter_context: ListFilterContext[Location] """Filter context data"""
[docs] @routes.view('/clips/list/', name='clip_list') class ClipListView(TemplatedView[ClipListContext]): """List view for :class:`granicus_archiver.model.Clip` objects """ template_name = 'clips/clip-list.jinja2' def __init__(self, request: web.Request) -> None: super().__init__(request) self.filter_context = self.parse_filter_context() items = list(self.clips.clips.values()) items = self.get_filtered_items(items) self.item_dict: dict[CLIP_ID, Clip] = {clip.id: clip for clip in items} self.items: list[Clip] = list(items) item_tuples = [(item.id, item) for item in self.items] self.paginator = Paginator[tuple[CLIP_ID, Clip]](self.request, item_tuples) @property def clips(self) -> ClipCollection: return self.request.app[ClipsKey] @property def legistar_data(self) -> LegistarData: return self.request.app[LegistarDataKey]
[docs] def parse_filter_context(self) -> ListFilterContext[Location]: """Parse the filter context from the request's query parameters """ view_unassigned = bool(self.request.query.get('unassigned')) filter_by_date = self.request.query.get('filter_by_date') is not None category = self.request.query.get('category') if not category: category = None if category is not None: category = Location(category) start_date = self.request.query.get('start_date') if not start_date: start_date = None if start_date is not None: start_date = datetime.datetime.strptime(start_date, '%Y-%m-%d').date() end_date = self.request.query.get('end_date') if not end_date: end_date = None if end_date is not None: end_date = datetime.datetime.strptime(end_date, '%Y-%m-%d').date() all_categories = set([clip.location for clip in self.clips]) hidden_categories = self.request.app[APP_CONF_KEY].hidden_clip_categories all_categories -= set(hidden_categories) return { 'view_unassigned': view_unassigned, 'all_categories': list(sorted(all_categories)), 'current_category': category, 'filter_by_date': filter_by_date, 'start_date': start_date, 'end_date': end_date, 'hidden_clip_list_filters': self.request.app[APP_CONF_KEY].hidden_clip_list_filters, }
[docs] def get_filtered_items(self, clips: Sequence[Clip]) -> Sequence[Clip]: """Get items filtered by the :attr:`filter_context` """ clips = self._filter_by_date_range(clips) clips = self._filter_by_category(clips) clips = self._filter_unassigned(clips) clips = self._filter_hidden(clips) return clips
def get_clip_guids(self) -> dict[CLIP_ID, GuidOrNoneStr]: legistar_data = self.legistar_data d: dict[CLIP_ID, GuidOrNoneStr] = {} for clip in self.clips: item = legistar_data.find_match_for_clip_id(clip.id) guid = guid_to_str(item) d[clip.id] = guid return d
[docs] async def get_context_data(self) -> ClipListContext: """Get the context data for this view """ context: ClipListContext = { 'nav_links': self.request.app[NavLinksKey], 'page_title': 'Clips', 'clip_dict': self.item_dict, 'clips': self.clips, 'clip_guids': self.get_clip_guids(), 'paginator': self.paginator, 'filter_context': self.filter_context, } return context
def _filter_by_date_range(self, clips: Sequence[Clip]) -> Sequence[Clip]: start_date, end_date = self.filter_context['start_date'], self.filter_context['end_date'] if not self.filter_context['filter_by_date']: return clips if start_date is None or end_date is None: if self.filter_context['filter_by_date']: self.filter_context['filter_by_date'] = False return clips return [ clip for clip in clips if start_date <= clip.datetime.date() <= end_date ] def _filter_by_category(self, clips: Sequence[Clip]) -> Sequence[Clip]: category = self.filter_context['current_category'] if category is None: return clips return [clip for clip in clips if clip.location == category] def _filter_unassigned(self, clips: Sequence[Clip]) -> Sequence[Clip]: if not self.filter_context['view_unassigned']: return clips return [ clip for clip in clips if self.legistar_data.is_clip_id_available(clip.id) ] def _filter_hidden(self, clips: Sequence[Clip]) -> Sequence[Clip]: return [ clip for clip in clips if not is_clip_hidden(self.request, clip) ]
[docs] class ClipViewContext(GlobalContext): """Template context for :class:`ClipViewBase` """ clip: Clip """The :class:`~.model.Clip` instance being viewed""" legistar_item: DetailPageResult|NoClipT|None """The associated legistar item, or ``None`` if no match is found""" legistar_guid: GUID|NoClipT|None """The associated legistar item guid, if one exists""" legistar_rguid_item: RGuidDetailResult|NoClipT|None """The associated real-guid legistar item, or ``None`` if no match is found""" legistar_rguid_item_id: REAL_GUID|NoClipT|None """Item id for :attr:`legistar_rguid_item`""" media_embed_type: Literal['video', 'audio']
ClipViewContextT = TypeVar('ClipViewContextT', bound=ClipViewContext)
[docs] class ClipViewBase(TemplatedView[ClipViewContextT]): """Base class for views that display a single :class:`~.model.Clip` instance """ template_name = 'clips/clip.jinja2' legistar_item: DetailPageResult|NoClipT|None legistar_rguid_item: RGuidDetailResult|NoClipT|None media_embed_type: Literal['video', 'audio'] = 'video' media_embed_query_param : ClassVar[str] = 'media_embed_type' def __init__(self, request: web.Request) -> None: super().__init__(request) clip_id = CLIP_ID(self.request.match_info['clip_id']) check_clip_hidden(request, clip_id) self.clip = self.clips[clip_id] self.legistar_item = self.legistar_data.find_match_for_clip_id(clip_id) self.legistar_rguid_item = self.legistar_data_rguid.find_match_for_clip_id(clip_id) media_embed_type = request.query.get('media_embed_type', 'video') assert media_embed_type in ('video', 'audio') self.media_embed_type = media_embed_type @property def clips(self) -> ClipCollection: return self.request.app[ClipsKey] @property def legistar_data(self) -> LegistarData: return self.request.app[LegistarDataKey] @property def legistar_data_rguid(self) -> RGuidLegistarData: return self.request.app[RGuidLegistarDataKey] @property def legistar_guid(self) -> GUID|None|NoClipT: if self.legistar_item is None or self.legistar_item is NoClip: return self.legistar_item return self.legistar_item.feed_guid @property def legistar_rguid_item_id(self) -> REAL_GUID|NoClipT|None: if self.legistar_rguid_item is None or self.legistar_rguid_item is NoClip: return self.legistar_rguid_item return self.legistar_rguid_item.real_guid
[docs] async def get_context_data(self) -> ClipViewContext: """Get the context data for this view """ return { 'page_title': self.clip.name, 'nav_links': self.request.app[NavLinksKey], 'clip': self.clip, 'legistar_item': self.legistar_item, 'legistar_guid': self.legistar_guid, 'legistar_rguid_item': self.legistar_rguid_item, 'legistar_rguid_item_id': self.legistar_rguid_item_id, 'media_embed_type': self.media_embed_type, }
[docs] @routes.view('/clips/clip/{clip_id}/', name='clip_item') class ClipView(ClipViewBase): """View for a single :class:`~.model.Clip` instance """ pass
[docs] class ClipEditForm(TypedDict): """Form data for :class:`ClipEditView` """ guid: GuidOrNoneStr """The associated legistar item guid or ``None``. A special value of ``"NoClip"`` will specify :data:`~.types.NoClip` for the item. """ real_guid: RealGuidOrNoneStr """The associated real-guid legistar item guid or ``None``. A special value of ``"NoClip"`` will specify :data:`~.types.NoClip` for the item. """
[docs] class ClipEditViewContext(ClipViewContext): """Template context for :class:`ClipEditView` """ form_data: ClipEditForm """The :class:`form <ClipEditForm>` data""" item_options: Iterable[DetailPageResult] """An iterable of possible choices to assign to the clip""" rguid_item_options: Iterable[RGuidDetailResult] """An iterable of possible choices to assign to the clip (real-guid)"""
[docs] @routes.view('/clips/clip-edit/{clip_id}/', name='clip_item_change') class ClipEditView(ClipViewBase[ClipEditViewContext]): """Edit view for a single :class:`~.model.Clip` instance """ template_name = 'clips/clip-change.jinja2' def get_form_initial(self) -> ClipEditForm: return { 'guid': guid_to_str(self.legistar_item), 'real_guid': real_guid_to_str(self.legistar_rguid_item), } async def get_form_data(self) -> ClipEditForm: data: ClipEditForm if self.request.method == 'POST': raw = await self.request.post() _guid = raw['guid'] assert isinstance(_guid, str) if _guid != 'None' and _guid != 'NoClip': _guid = GUID(_guid) _real_guid = raw['real_guid'] assert isinstance(_real_guid, str) if _real_guid != 'None' and _real_guid != 'NoClip': _real_guid = REAL_GUID(_real_guid) data = { 'guid': _guid, 'real_guid': _real_guid, } else: data = self.get_form_initial() return data def get_item_options(self) -> Iterable[DetailPageResult]: return self._get_item_options(self.legistar_data, DetailPageResult) def get_rguid_item_options(self) -> Iterable[RGuidDetailResult]: return self._get_item_options(self.legistar_data_rguid, RGuidDetailResult) def _get_item_options[ IdT: (GUID, REAL_GUID), ItemT: (DetailPageResult, RGuidDetailResult) ]( self, legistar_data: AbstractLegistarModel[IdT, ItemT], item_type: type[ItemT] ) -> Iterable[ItemT]: items: list[ItemT] = [] item_guids = set[IdT]() if item_type is DetailPageResult: legistar_item = self.legistar_item else: legistar_item = self.legistar_rguid_item legistar_item = cast(ItemT, legistar_item) if legistar_item is not None and legistar_item is not NoClip: items.append(legistar_item) item_guids.add(legistar_data.get_guid_for_detail_result(legistar_item)) for item in legistar_data: item_id = legistar_data.get_guid_for_detail_result(item) if item_id in item_guids: continue if legistar_data.is_guid_matched(item_id): continue delta = self.clip.datetime - item.feed_item.meeting_date if abs(delta) > datetime.timedelta(days=90): continue items.append(item) item_guids.add(item_id) return items
[docs] async def get_context_data(self) -> ClipEditViewContext: """Get the context data for this view """ return { 'page_title': self.clip.unique_name, 'nav_links': self.request.app[NavLinksKey], 'clip': self.clip, 'legistar_item': self.legistar_item, 'legistar_guid': self.legistar_guid, 'legistar_rguid_item': self.legistar_rguid_item, 'legistar_rguid_item_id': self.legistar_rguid_item_id, 'form_data': await self.get_form_data(), 'item_options': self.get_item_options(), 'rguid_item_options': self.get_rguid_item_options(), 'media_embed_type': self.media_embed_type, }
@with_data_file_lock @read_only_guard async def post(self) -> web.Response: next_url = self.request.app.router['clip_item_change'].url_for(clip_id=self.clip.id) context = await self.get_context_data() form_data = context['form_data'] guid = guid_from_str(form_data['guid']) guid_changed = id_equal(guid, self.legistar_guid) logger.info(f'{guid=}, {self.legistar_guid=}, {guid_changed=}') real_guid = real_guid_from_str(form_data['real_guid']) real_guid_changed = id_equal(real_guid, self.legistar_rguid_item_id) logger.info(f'{real_guid=}, {self.legistar_rguid_item_id=}, {real_guid_changed=}') # raise web.HTTPFound(next_url) if not guid_changed and not real_guid_changed: raise web.HTTPFound(next_url) if guid_changed: if guid is None or guid is NoClip: current_guid = self.legistar_guid assert isinstance(current_guid, str) item = self.legistar_data[current_guid] logger.debug(f'unlinking item: {item.real_guid=}') # self.legistar_data.add_clip_match_override(item.real_guid, guid) else: item = self.legistar_data[guid] logger.debug(f'setting clip match: {item.real_guid=}') # self.legistar_data.add_clip_match_override(item.real_guid, self.clip.id) if real_guid_changed: if real_guid is None or real_guid is NoClip: current_guid = self.legistar_rguid_item_id assert isinstance(current_guid, str) item = self.legistar_data_rguid[current_guid] logger.debug(f'unlinking item: {item.real_guid=}') # self.legistar_data_rguid.add_clip_match_override(item.real_guid, real_guid) else: item = self.legistar_data_rguid[real_guid] logger.debug(f'setting clip match: {item.real_guid=}') # self.legistar_data_rguid.add_clip_match_override(item.real_guid, self.clip.id) # conf_file = self.get_config() # self.legistar_data.save(conf_file.legistar.data_file) raise web.HTTPFound(next_url)
[docs] class LegistarListFilterContext(ListFilterContext[Category]): """Filter context data for legistar items """ agenda_status: AgendaStatus|None """The agenda status to filter by""" minutes_status: MinutesStatus|None """The minutes status to filter by""" agenda_status_items: Iterable[AgendaStatus] """All possible agenda status items""" minutes_status_items: Iterable[MinutesStatus] """All possible minutes status items"""
type SearchResultCount = Literal[5, 10, 25, 50, 100] """Type to restrict search result count options to certain values""" SearchResultCountOptions: tuple[SearchResultCount, ...] = (5, 10, 25, 50, 100) def is_valid_search_result_count(value: int) -> TypeIs[SearchResultCount]: return value in SearchResultCountOptions
[docs] class LegistarSearchForm(TypedDict): """Form data for searching legistar items """ search: str """The search term""" max_results: SearchResultCount """The maximum number of search results to return""" max_results_options: Sequence[SearchResultCount] """All possible options for :attr:`max_results`"""
[docs] class LegistarItemsContext[ IdT: (GUID, REAL_GUID), ItemT: (DetailPageResult, RGuidDetailResult) ](GlobalContext): """Template context for :class:`LegistarItemsViewBase` """ items: Iterable[ItemT] """The items to display""" item_dict: dict[IdT, ItemT] """Mapping of items by their id""" item_clip_ids: dict[IdT, ClipIdOrNoneStr] """Mapping of item ids to their associated clip ids""" item_view_name: str """The name of the view to display an individual item""" item_edit_view_name: str """The name of the view to edit an individual item""" paginator: Paginator[tuple[IdT, ItemT]] """:class:`~.pagination.Paginator` instance for :attr:`item_dict`""" filter_context: LegistarListFilterContext """Filter context data""" search_form: LegistarSearchForm|None """The search form data, if applicable""" search_item_scores: dict[IdT, float]|None """Mapping of item ids to their search scores, if applicable""" search_item_results: dict[IdT, LegistarSearchResult]|None """Mapping of item ids to their search results, if applicable"""
[docs] class LegistarItemsViewBase[ IdT: (GUID, REAL_GUID), ItemT: (DetailPageResult, RGuidDetailResult), ](TemplatedView[LegistarItemsContext[IdT, ItemT]]): """Base class for views that display a list of legistar items """ template_name = 'legistar/items.jinja2' paginator: Paginator[tuple[IdT, ItemT]] search_form_data: LegistarSearchForm search_results: list[LegistarSearchResult]|None = None search_item_scores: dict[IdT, float]|None = None search_item_results: dict[IdT, LegistarSearchResult]|None = None def __init__(self, request: web.Request) -> None: super().__init__(request) self.filter_context = self.parse_filter_context() self.item_dict, self.items = self.get_items() self.paginator = self.build_paginator() def build_paginator(self) -> Paginator[tuple[IdT, ItemT]]: item_tuples = [(self._id_for_item(item), item) for item in self.items] disabled = self.search_results is not None return Paginator[tuple[IdT, ItemT]](self.request, item_tuples, disable_pagination=disabled) @property @abstractmethod def item_view_name(self) -> str: ... @property @abstractmethod def item_edit_view_name(self) -> str: ... @property @abstractmethod def legistar_data(self) -> AbstractLegistarModel[IdT, ItemT]: ... @abstractmethod def _id_for_item(self, item: ItemT) -> IdT: ... def get_search_form_initial(self) -> LegistarSearchForm: return { 'search': self.request.query.get('search', ''), 'max_results': 25, 'max_results_options': SearchResultCountOptions, } async def get_form_data(self) -> LegistarSearchForm: data: LegistarSearchForm if self.request.method == 'POST': raw = await self.request.post() _search = raw.get('search', '') assert isinstance(_search, str) try: m = raw.get('max_results', 25) if isinstance(m, str): _max_results = int(m) elif isinstance(m, int): _max_results = m else: raise TypeError(f'invalid type: {type(m)}') except (ValueError, TypeError): _max_results = 25 if not is_valid_search_result_count(_max_results): _max_results = 25 data = { 'search': _search, 'max_results': _max_results, 'max_results_options': SearchResultCountOptions, } else: data = self.get_search_form_initial() try: _max_results = int(self.request.query.get('max_results', 25)) except (ValueError, TypeError): _max_results = 25 if is_valid_search_result_count(_max_results): data['max_results'] = _max_results return data async def get_search_results(self) -> list[LegistarSearchResult]|None: form_data = self.search_form_data if not form_data['search']: return None conf = self.get_config() web_conf = self.request.app[APP_CONF_KEY] if web_conf.use_s3: s3_client = self.request.app[S3ClientKey] index_dir = s3_client.search_dir_local else: index_dir = conf.legistar.search_index_dir max_results = form_data['max_results'] results = legistar_search_contents( form_data['search'], index=index_dir, limit=max_results, ) return results async def post(self) -> web.Response: self.search_form_data = await self.get_form_data() self.search_results = await self.get_search_results() self.item_dict, self.items = self.get_items() self.paginator = self.build_paginator() return await self.get()
[docs] async def get(self) -> web.Response: self.search_form_data = await self.get_form_data() self.search_results = await self.get_search_results() self.item_dict, self.items = self.get_items() self.paginator = self.build_paginator() return await super().get()
def parse_filter_context(self) -> LegistarListFilterContext: view_unassigned = bool(self.request.query.get('unassigned')) category = self.request.query.get('category') if not category: category = None if category is not None: category = Category(category) agenda_status = self.request.query.get('agenda_status') if not agenda_status: agenda_status = None if agenda_status is not None: assert agenda_status in AgendaStatusItems minutes_status = self.request.query.get('minutes_status') if not minutes_status: minutes_status = None if minutes_status is not None: assert minutes_status in MinutesStatusItems return { 'view_unassigned': view_unassigned, 'all_categories': list(sorted(self.get_all_categories())), 'current_category': category, 'filter_by_date': is_truthy(self.request.query.get('filter_by_date')), 'start_date': parse_date(self.request.query.get('start_date')), 'end_date': parse_date(self.request.query.get('end_date')), 'agenda_status': agenda_status, 'minutes_status': minutes_status, 'agenda_status_items': AgendaStatusItems, 'minutes_status_items': MinutesStatusItems, 'hidden_clip_list_filters': self.request.app[APP_CONF_KEY].hidden_clip_list_filters, } def get_items(self) -> tuple[dict[IdT, ItemT], Sequence[ItemT]]: legistar_data = self.legistar_data item_clip_ids: dict[IdT, ClipIdOrNoneStr] = {} for guid, item in legistar_data.items(): clip_id = legistar_data.get_clip_id_for_guid(guid) item_clip_ids[guid] = clip_id_to_str(clip_id) item_dict = dict(legistar_data.items()) item_dict = self.get_filtered_items(item_dict) item_dict = self._sort_items(item_dict) return item_dict, list(item_dict.values())
[docs] def get_filtered_items(self, item_dict: dict[IdT, ItemT]) -> dict[IdT, ItemT]: """Get items filtered by the :attr:`filter_context` """ item_dict = self._filter_by_category(item_dict) item_dict = self._filter_by_date(item_dict) item_dict = self._filter_unassigned_clips(item_dict) return self._filter_by_status(item_dict)
def get_all_categories(self) -> set[Category]: return set([item.feed_item.category for item in self.legistar_data])
[docs] async def get_context_data(self) -> LegistarItemsContext[IdT, ItemT]: """Get the context data for this view """ legistar_data = self.legistar_data item_clip_ids: dict[IdT, ClipIdOrNoneStr] = {} for guid, item in legistar_data.items(): clip_id = legistar_data.get_clip_id_for_guid(guid) if is_clip_hidden(self.request, clip_id): clip_id = None item_clip_ids[guid] = clip_id_to_str(clip_id) context: LegistarItemsContext[IdT, ItemT] = { 'page_title': 'Legistar Items', 'nav_links': self.request.app[NavLinksKey], 'items': self.items, 'item_dict': self.item_dict, 'item_clip_ids': item_clip_ids, 'item_view_name': self.item_view_name, 'item_edit_view_name': self.item_edit_view_name, 'paginator': self.paginator, 'filter_context': self.filter_context, 'search_form': self.search_form_data, 'search_item_scores': self.search_item_scores, 'search_item_results': self.search_item_results, } return context
def _filter_by_status( self, item_dict: dict[IdT, ItemT], ) -> dict[IdT, ItemT]: agenda_status = self.filter_context['agenda_status'] minutes_status = self.filter_context['minutes_status'] def is_item_included(item: ItemT) -> bool: agenda_match = agenda_status is None or item.agenda_status == agenda_status minutes_match = minutes_status is None or item.minutes_status == minutes_status return agenda_match and minutes_match if agenda_status is None and minutes_status is None: return item_dict return { guid: item for guid, item in item_dict.items() if is_item_included(item) } def _filter_by_category( self, item_dict: dict[IdT, ItemT], ) -> dict[IdT, ItemT]: category = self.filter_context['current_category'] if category is None: return item_dict return self.legistar_data.filter_by_category(category, items=item_dict) def _filter_by_date( self, item_dict: dict[IdT, ItemT], ) -> dict[IdT, ItemT]: if not self.filter_context['filter_by_date']: return item_dict start_date, end_date = self.filter_context['start_date'], self.filter_context['end_date'] if start_date is None or end_date is None: self.filter_context['filter_by_date'] = False return item_dict return self.legistar_data.filter_by_dt_range( start_dt=datetime.datetime.combine(start_date, datetime.time.min), end_dt=datetime.datetime.combine(end_date, datetime.time.max), items=item_dict ) def _filter_unassigned_clips( self, item_dict: dict[IdT, ItemT] ) -> dict[IdT, ItemT]: if not self.filter_context['view_unassigned']: return item_dict return { guid: item for guid, item in item_dict.items() if self.legistar_data.get_clip_id_for_guid(guid) is None } def _sort_items( self, item_dict: dict[IdT, ItemT], order: SortOrder = 'dsc' ) -> dict[IdT, ItemT]: by_dt: dict[datetime.datetime, list[ItemT]] = {} for item in item_dict.values(): dt = item.feed_item.meeting_date item_list = by_dt.setdefault(dt, []) item_list.append(item) sorted_items: list[ItemT] = [] for dt in sorted(by_dt.keys()): sorted_items.extend(by_dt[dt]) if order == 'dsc': sorted_items.reverse() return {self._id_for_item(item):item for item in sorted_items}
[docs] @routes.view('/legistar/items/', name='legistar_items') class LegistarItemsView(LegistarItemsViewBase[GUID, DetailPageResult]): """A list view of :class:`~.legistar.model.DetailPageResult` instances """ @property def legistar_data(self) -> LegistarData: return self.request.app[LegistarDataKey] def _id_for_item(self, item: DetailPageResult) -> GUID: return item.feed_guid @property def item_view_name(self) -> str: return 'legistar_item' @property def item_edit_view_name(self) -> str: return 'legistar_item_change'
[docs] @routes.view('/legistar-rguid/items/', name='rguid_legistar_items') class RGuidLegistarItemsView(LegistarItemsViewBase[REAL_GUID, RGuidDetailResult]): """A list view of :class:`~.legistar.guid_model.RGuidDetailResult` instances """ @property def legistar_data(self) -> RGuidLegistarData: return self.request.app[RGuidLegistarDataKey] def _id_for_item(self, item: RGuidDetailResult) -> REAL_GUID: return item.real_guid @property def item_view_name(self) -> str: return 'rguid_legistar_item' @property def item_edit_view_name(self) -> str: return 'rguid_legistar_item_change' def get_items(self) -> tuple[dict[REAL_GUID, RGuidDetailResult], Sequence[RGuidDetailResult]]: if self.search_results is None: return super().get_items() logger.debug(f'getting items from search results: {len(self.search_results)} found') item_dict: dict[REAL_GUID, RGuidDetailResult] = {} item_clip_ids: dict[REAL_GUID, ClipIdOrNoneStr] = {} self.search_item_scores = {} self.search_item_results = {} for result in self.search_results: rguid = result.file_id.rguid item_dict[rguid] = self.legistar_data[rguid] clip_id = self.legistar_data.get_clip_id_for_guid(rguid) item_clip_ids[rguid] = clip_id_to_str(clip_id) self.search_item_scores[rguid] = result.score self.search_item_results[rguid] = result item_dict = self.get_filtered_items(item_dict) return item_dict, list(item_dict.values())
[docs] class LegistarItemFormData(TypedDict): """Form data for :class:`LItemChangeView` """ clip_id: ClipIdOrNoneStr """The associated clip id or ``None``""" next_url: URL|str """The URL to redirect to after submitting the form"""
[docs] class _ItemContext[ IdT: (GUID, REAL_GUID), ItemT: (DetailPageResult, RGuidDetailResult) ](GlobalContext): """Template context for :class:`LItemViewBase` :meta public: """ legistar_type: Literal['legistar', 'legistar_rguid'] """The type of legistar data""" item: ItemT """The item being viewed""" item_id: IdT """The id of the :attr:`item`""" files: LegistarFiles|None """The files associated with the item""" file_iter: Iterable[tuple[LegistarFileUID, AbstractFile]] """An iterable of :class:`~.legistar.model.AbstractFile` instances""" file_static_key: StaticRootName """The key for the item assets in :class:`~.web.types.StaticUrlRoots`""" clip_id: ClipIdOrNoneStr """The associated clip id or ``None``""" clip: Clip|None """The associated :class:`~.model.Clip` or ``None``""" change_view_name: str """The name of the view to edit the item""" media_embed_type: Literal['video', 'audio'] """The type of media embed to use""" media_embed_query_param: str """The query parameter to use for media embed type selection"""
[docs] class _ItemChangeContext[ IdT: (GUID, REAL_GUID), ItemT: (DetailPageResult, RGuidDetailResult) ](_ItemContext[IdT, ItemT]): """Template context for :class:`LItemChangeView` :meta public: """ form_data: LegistarItemFormData """The form data""" clip_options: Iterable[Clip] """An iterable of possible clips to assign to the item"""
type ItemContextT[ IdT: (GUID, REAL_GUID), ItemT: (DetailPageResult, RGuidDetailResult) ] = _ItemContext[IdT, ItemT]|_ItemChangeContext[IdT, ItemT] """Type alias for the template context for :class:`LItemViewBase` and :class:`LItemChangeView`""" type ItemContext[ X: ItemContextT, A: (GUID, REAL_GUID), B: (DetailPageResult, RGuidDetailResult) ] = ItemContextT[A, B] """Type alias for the template context for :class:`LItemViewBase` and :class:`LItemChangeView`"""
[docs] class LItemViewBase[ X: (_ItemContext, _ItemChangeContext), IdT: (GUID, REAL_GUID), ItemT: (DetailPageResult, RGuidDetailResult), ](TemplatedView[ItemContext[X, IdT, ItemT]]): """Base class for views that display a single :class:`~.legistar.model.DetailPageResult` or :class:`~.legistar.guid_model.RGuidDetailResult` instance """ template_name = 'legistar/item.jinja2' legistar_type: Literal['legistar', 'legistar_rguid'] media_embed_type: Literal['video', 'audio'] = 'video' media_embed_query_param = 'media_embed_type' guid: IdT clip_id: CLIP_ID|None|NoClipT def __init__(self, request: web.Request) -> None: super().__init__(request) self.guid = self.get_guid() self.clip_id = self.legistar_data.get_clip_id_for_guid(self.guid) if is_clip_hidden(request, self.clip_id): self.clip_id = None media_embed_type = request.query.get('media_embed_type', 'video') assert media_embed_type in ('video', 'audio') self.media_embed_type = media_embed_type @property @abstractmethod def legistar_data(self) -> AbstractLegistarModel[IdT, ItemT]: raise NotImplementedError @property @abstractmethod def file_static_key(self) -> StaticRootName: ... @abstractmethod def get_guid(self) -> IdT: raise NotImplementedError @abstractmethod def get_files(self) -> LegistarFiles|None: raise NotImplementedError @abstractmethod def iter_files(self) -> Iterable[tuple[LegistarFileUID, AbstractFile]]: ... @abstractmethod def get_change_view_name(self) -> str: raise NotImplementedError @property def clips(self) -> ClipCollection: return self.request.app[ClipsKey] @property def clip(self) -> Clip|None: if self.clip_id is None or self.clip_id is NoClip: return None if is_clip_hidden(self.request, self.clip_id): return None try: return self.clips[self.clip_id] except KeyError: logger.warning(f'Clip with id {self.clip_id} not found in clips collection') return None @property def item(self) -> ItemT: return self.legistar_data[self.guid]
[docs] @abstractmethod async def get_context_data(self) -> ItemContext[X, IdT, ItemT]: """Get the context data for this view """ raise NotImplementedError
async def get_base_context_data(self) -> _ItemContext[IdT, ItemT]: return { 'page_title': self.item.feed_item.title, 'nav_links': self.request.app[NavLinksKey], 'legistar_type': self.legistar_type, 'item': self.item, 'item_id': self.guid, 'files': self.get_files(), 'file_iter': self.iter_files(), 'clip_id': clip_id_to_str(self.clip_id), 'file_static_key': self.file_static_key, 'clip': self.clip, 'change_view_name': self.get_change_view_name(), 'media_embed_type': self.media_embed_type, 'media_embed_query_param': self.media_embed_query_param, }
[docs] class LItemMixin: """Mixin for views that display a single :class:`~.legistar.model.DetailPageResult` """ request: web.Request item: DetailPageResult guid: GUID @property def legistar_data(self) -> LegistarData: return self.request.app[LegistarDataKey] @property def file_static_key(self) -> StaticRootName: return 'legistar' def get_guid(self) -> GUID: guid = self.request.match_info['guid'] assert is_guid(guid) return guid def get_files(self) -> LegistarFiles | None: return self.legistar_data.files.get(self.item.feed_guid) def iter_files(self) -> Iterable[tuple[LegistarFileUID, AbstractFile]]: files = self.get_files() if files is None: return [] result: list[tuple[LegistarFileUID, LegistarFile|AttachmentFile]] = [] for f_key, f_file in files: if f_file is None: continue uid = file_key_to_uid(f_key) full_p = self.legistar_data.get_file_path(self.guid, f_key) full_p = full_p.relative_to(self.legistar_data.root_dir) result.append((uid, dataclasses.replace(f_file, filename=full_p))) for att_key, att_file in files.attachments.items(): if att_file is None: continue uid = attachment_name_to_uid(att_key) full_p = self.legistar_data.get_attachment_path(self.guid, att_key) full_p = full_p.relative_to(self.legistar_data.root_dir) result.append((uid, dataclasses.replace(att_file, filename=full_p))) return result def get_change_view_name(self) -> str: return 'legistar_item_change' def _get_next_url(self) -> URL: return self.request.app.router['legistar_item'].url_for(guid=self.guid)
[docs] @routes.view('/legistar/items/view/{guid}/', name='legistar_item') class LItemView(LItemMixin, LItemViewBase[_ItemContext[GUID, DetailPageResult], GUID, DetailPageResult]): """View for a single :class:`~.legistar.model.DetailPageResult` instance """ legistar_type = 'legistar'
[docs] async def get_context_data(self) -> _ItemContext[GUID, DetailPageResult]: """Get the context data for this view """ return await self.get_base_context_data()
[docs] class RGItemMixin: """Mixin for views that display a single :class:`~.legistar.guid_model.RGuidDetailResult` """ guid: REAL_GUID request: web.Request item: RGuidDetailResult @property def legistar_data(self) -> RGuidLegistarData: return self.request.app[RGuidLegistarDataKey] @property def file_static_key(self) -> StaticRootName: return 'legistar_rguid' def get_guid(self) -> REAL_GUID: guid = self.request.match_info['guid'] assert is_real_guid(guid) return guid def get_files(self) -> LegistarFiles | None: return None def iter_files(self) -> Iterable[tuple[LegistarFileUID, AbstractFile]]: for f in self.item.files: full_p = self.item.files.get_file_path(f.uid, absolute=True) full_p = full_p.relative_to(self.legistar_data.root_dir) yield f.uid, dataclasses.replace(f, filename=full_p) def get_change_view_name(self) -> str: return 'rguid_legistar_item_change' def _get_next_url(self) -> URL: return self.request.app.router['rguid_legistar_item'].url_for(guid=self.guid)
[docs] @routes.view('/legistar-rguid/items/view/{guid}/', name='rguid_legistar_item') class RGItemView(RGItemMixin, LItemViewBase[_ItemContext[REAL_GUID, RGuidDetailResult], REAL_GUID, RGuidDetailResult]): """View for a single :class:`~.legistar.guid_model.RGuidDetailResult` instance """ legistar_type = 'legistar_rguid'
[docs] async def get_context_data(self) -> _ItemContext[REAL_GUID, RGuidDetailResult]: """Get the context data for this view """ return await self.get_base_context_data()
[docs] class LItemChangeViewBase[ IdT: (GUID, REAL_GUID), ItemT: (DetailPageResult, RGuidDetailResult), ](LItemViewBase[_ItemChangeContext[IdT, ItemT], IdT, ItemT]): """Base class for edit views of legistar items """ template_name = 'legistar/item-change.jinja2' def __init__(self, request: web.Request) -> None: super().__init__(request) self.form_initial = self.get_form_initial() def get_next_url(self) -> URL: prev = self.request.query.get('prev') if prev is not None: assert isinstance(prev, str) return URL(prev) return self._get_next_url() @abstractmethod def _get_next_url(self) -> URL: raise NotImplementedError def get_clip_options(self) -> Iterable[Clip]: legistar_data = self.legistar_data category_maps = self.get_config().legistar.category_maps category_maps_rev = {v:k for k,v in category_maps.items()} item = self.legistar_data[self.guid] location = category_maps_rev.get(item.feed_item.category) clips = [] for clip in self.clips: if not legistar_data.is_clip_id_available(clip.id) and clip.id != self.clip_id: continue if location is not None and clip.location != location: continue delta = clip.datetime - item.feed_item.meeting_date if abs(delta) > datetime.timedelta(days=90): continue clips.append(clip) return clips def get_form_initial(self) -> LegistarItemFormData: return { 'clip_id': clip_id_to_str(self.clip_id), 'next_url': self.get_next_url(), } async def get_form_data(self) -> LegistarItemFormData: data: LegistarItemFormData if self.request.method == 'POST': raw = await self.request.post() _clip_id = raw['clip_id'] next_url = raw['next_url'] assert isinstance(_clip_id, str) if _clip_id != 'None' and _clip_id != 'NoClip': _clip_id = CLIP_ID(_clip_id) assert isinstance(next_url, str) next_url = URL(next_url) data = { 'clip_id': _clip_id, 'next_url': next_url, } else: data = self.get_form_initial() return data
[docs] async def get_context_data(self) -> _ItemChangeContext[IdT, ItemT]: """Get the context data for this view """ sc: _ItemContext[IdT, ItemT] = await super().get_base_context_data() c = cast(_ItemChangeContext[IdT, ItemT], sc) c['form_data'] = await self.get_form_data() c['clip_options'] = self.get_clip_options() return c
@with_data_file_lock @read_only_guard async def post(self) -> web.Response: context = await self.get_context_data() item = context['item'] form_data = context['form_data'] next_url = form_data['next_url'] clip_id = clip_id_from_str(form_data['clip_id']) changed = not id_equal(clip_id, self.clip_id) logger.info(f'{clip_id=}, {self.clip_id=}, {changed=}') if not changed: raise web.HTTPFound(next_url) self.legistar_data.add_clip_match_override(item.real_guid, clip_id) conf = self.get_config() if isinstance(self.legistar_data, LegistarData): self.legistar_data.save(conf.legistar.data_file) else: self.legistar_data.save(RGuidLegistarData._get_data_file(conf)) raise web.HTTPFound(next_url)
[docs] @routes.view('/legistar/items/change/{guid}/', name='legistar_item_change') class LItemChangeView(LItemMixin, LItemChangeViewBase[GUID, DetailPageResult]): """Edit view for a single :class:`~.legistar.model.DetailPageResult` instance """ legistar_type = 'legistar'
[docs] @routes.view('/legistar-rguid/items/change/{guid}/', name='rguid_legistar_item_change') class RGLItemChangeView(RGItemMixin, LItemChangeViewBase[REAL_GUID, RGuidDetailResult]): """Edit view for a single :class:`~.legistar.guid_model.RGuidDetailResult` instance """ legistar_type = 'legistar_rguid'