from __future__ import annotations
from typing import TypedDict, Literal
from pathlib import Path
import datetime
from aiohttp import web
import jinja2
from yarl import URL
from ..clips.model import CLIP_ID
from ..legistar.types import GUID, REAL_GUID, LegistarFileUID
from ..legistar.rss_parser import is_guid, is_real_guid
from .types import (
StaticRootName, StaticUrlRootsKey, TimezoneKey, ClipsKey, LegistarDataKey,
RGuidLegistarDataKey,
)
from .config import APP_CONF_KEY
from .s3client import S3ClientKey
[docs]
class Context(TypedDict):
"""Filter context
"""
app: web.Application
"""The current application"""
request: web.Request
"""The current request"""
[docs]
@jinja2.pass_context
def local_tz(ctx: Context, dt: datetime.datetime) -> datetime.datetime:
"""Convert a datetime to the app's local timezone (stored in :data:`.types.TimezoneKey`)
"""
app = ctx['app']
tz = app[TimezoneKey]
if dt.tzinfo is not None:
return dt.astimezone(tz)
return dt.replace(tzinfo=tz)
[docs]
def snake_case_to_title(s: str) -> str:
"""Convert a snake_case string to title case
"""
return ' '.join(s.split('_')).title()
[docs]
@jinja2.pass_context
def url_query(
ctx: Context,
query_: dict[str, str]|None = None,
merge: bool = True
) -> URL:
"""Add query parameters to the current request URL
Arguments:
query_: The query parameters to add
merge: If ``True``, merge the new query parameters with any ewxisting
query parameters. Otherwise, replace the existing query parameters.
"""
cur_url = ctx['request'].url
cur_query = cur_url.query
if merge and query_ is not None:
new_query = {k: cur_query[k] for k in cur_query}
new_query.update(query_)
else:
new_query = query_
return cur_url.with_query(new_query)
[docs]
@jinja2.pass_context
def static_path(ctx: Context, static_name: StaticRootName, filename: Path|str) -> URL:
"""Get a URL for a static file
Arguments:
static_name: The name of the static root to use
(a member of :class:`.types.StaticUrlRoots`).
filename: The path to the file, relative to the static root
"""
# root = ctx['app'][StaticRootsKey][static_name]
url_root = ctx['app'][StaticUrlRootsKey][static_name]
assert not Path(filename).is_absolute()
return url_root.joinpath(str(filename))
# p = root / filename
# assert not p.is_absolute()
# return URL(f'/{p}')
[docs]
@jinja2.pass_context
def clip_url(
ctx: Context,
clip_id: CLIP_ID,
file_type: Literal['video', 'audio', 'chapters'],
) -> URL:
"""Get the s3 URL for a clip file
.. note::
For the ``chapters`` file type, the URL will be for a local view
of the chapters file (:func:`.views.clip_webvtt`).
This is to prevent issues with CORS.
"""
app_conf = ctx['app'][APP_CONF_KEY]
if not app_conf.use_s3:
raise web.HTTPInternalServerError(reason='Clips are not available')
if file_type == 'chapters':
return ctx['app'].router['clip_webvtt'].url_for(clip_id=clip_id)
s3_client = ctx['app'][S3ClientKey]
clips = ctx['app'][ClipsKey]
clip = clips[clip_id]
rel_filename = clip.get_file_path(file_type, absolute=False)
s3_prefix = s3_client.data_dirs['clips']
s3_path = s3_prefix / rel_filename
return s3_client.url_for_key(str(s3_path))
[docs]
@jinja2.pass_context
def legistar_url(
ctx: Context,
key: tuple[Literal['legistar'], GUID, LegistarFileUID]|tuple[Literal['legistar_rguid'], REAL_GUID, LegistarFileUID],
) -> URL:
"""Get the s3 URL for a legistar file
"""
app_conf = ctx['app'][APP_CONF_KEY]
if not app_conf.use_s3:
raise web.HTTPInternalServerError(reason='Legistar files are not available')
model_type, guid, uid = key
s3_client = ctx['app'][S3ClientKey]
if model_type == 'legistar':
m = ctx['app'][LegistarDataKey]
assert is_guid(guid)
filename, _ = m.get_path_for_uid(guid, uid)
else:
m = ctx['app'][RGuidLegistarDataKey]
assert is_real_guid(guid)
filename, _ = m.get_path_for_uid(guid, uid)
data_root = m.root_dir
filename = filename.relative_to(data_root)
s3_prefix = s3_client.data_dirs[model_type]
s3_path = s3_prefix / filename
return s3_client.url_for_key(str(s3_path), scheme=ctx['request'].scheme)