from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from logging import Logger
from typing import Self, TYPE_CHECKING
from pathlib import Path
import asyncio
import webbrowser
from dataclasses import dataclass
from dotenv import load_dotenv
from loguru import logger
from aiohttp import web
from aiohttp.web_log import AccessLogger as AiohttpAccessLogger
from yarl import URL
import aiohttp_jinja2
import jinja2
import click
from click_extra import extra_group as click_group
from ..click_extra_params import extra_params
from .. import set_local_timezone
from ..config import Config
from ..clips.model import ClipCollection
from ..legistar.model import LegistarData
from ..legistar.guid_model import RGuidLegistarData
from .types import (
ConfigKey, ClipsKey, LegistarDataKey, RGuidLegistarDataKey,
TimezoneKey, StaticRootsKey, StaticRoots, StaticUrlRootsKey, StaticUrlRoots,
NavLinksKey, DataFiles, DataFileLockKey, NavLink,
)
from .config import AppConfig, APP_CONF_KEY
from .s3client import S3Client, S3ClientKey
if TYPE_CHECKING:
from ..cli import BaseContext
from . import views, filters
HERE = Path(__file__).resolve().parent
STATIC_ROOT = HERE / 'static'
routes = web.RouteTableDef()
logger.level('ACCESS', no=25, color='<green><bold>')
_access_logger = logger
[docs]
class AccessLogger(AiohttpAccessLogger):
LOG_FORMAT = '%a "%r" %s %b "%{Referer}i" "%{User-Agent}i"'
def __init__(self, logger: Logger, log_format: str = LOG_FORMAT) -> None:
log_format = AccessLogger.LOG_FORMAT
super().__init__(logger, log_format)
@property
def enabled(self) -> bool:
return True
[docs]
def log(self, request: web.Request, response: web.StreamResponse, time: float) -> None:
try:
if request.path.startswith('/healthcheck') and response.status == 200:
return
fmt_info = self._format_line(request, response, time)
values = list()
extra = dict()
for key, value in fmt_info:
values.append(value)
if key.__class__ is str:
extra[key] = value
else:
k1, k2 = key # type: ignore[misc]
dct = extra.get(k1, {}) # type: ignore[var-annotated,has-type]
dct[k2] = value # type: ignore[index,has-type]
extra[k1] = dct # type: ignore[has-type,assignment]
msg = self._log_format % tuple(values)
_access_logger.log('ACCESS', msg, extra=extra)
except Exception:
logger.exception("Error in logging")
[docs]
@dataclass
class WebCliContext:
parent: BaseContext
app_conf: AppConfig
@routes.get('/', name='home')
@aiohttp_jinja2.template('home.jinja2')
async def home(request: web.Request) -> views.GlobalContext:
index_nav_link_name = request.app[APP_CONF_KEY].index_nav_link_name
nav_links = request.app[NavLinksKey]
if index_nav_link_name != 'home':
# Find the nav link with the name specified by index_nav_link_name
# and redirect to it
index_nav_item: NavLink|None = None
for nl in nav_links:
if nl.name == index_nav_link_name:
index_nav_item = nl
break
if index_nav_item is not None:
index_nav_url = index_nav_item.get_url(request.app)
raise web.HTTPFound(index_nav_url)
return {
'page_title': request.app[APP_CONF_KEY].site_name,
'nav_links': request.app[NavLinksKey],
}
# async def request_processor(request: web.Request):
# return {'myrequest': request}
[docs]
class AppDataContext:
"""Context manager for setting up the application data
"""
update_timeout = 300
"""Update interval to check for updated data files"""
data_files: DataFiles
update_task: asyncio.Task|None
"""Task to check for updated data files
This task is created in when the context is entered and cancelled closed.
"""
def __init__(self, app: web.Application) -> None:
self.app = app
self.s3_client: S3Client|None = None
self._closing = False
self._running = False
self.update_task: asyncio.Task|None = None
async def open(self) -> None:
await self.__aenter__()
async def close(self) -> None:
await self.__aexit__(None, None, None)
async def __aenter__(self) -> Self:
assert not self._running
self._running = True
use_s3 = self.app[APP_CONF_KEY].use_s3
if use_s3:
self.s3_client = S3Client(self.app)
await self.s3_client.__aenter__()
self.app[S3ClientKey] = self.s3_client
await self.s3_client.get_search_index_dir()
self.update_task = asyncio.create_task(self.update_loop())
return self
async def __aexit__(self, exc_type, exc_value, traceback):
if not self._running:
return
self._running = False
self._closing = True
t = self.update_task
self.update_task = None
if t is not None:
t.cancel()
try:
await t
except asyncio.CancelledError:
pass
if self.s3_client is not None:
await self.s3_client.__aexit__(exc_type, exc_value, traceback)
[docs]
async def load_app_models(self) -> None:
"""Load the model data
If :attr:`.config.AppConfig.use_s3` is True, the data files are
downloaded from S3 using :class:`.s3client.S3Client`.
"""
config = self.app[ConfigKey]
data_files: DataFiles
if self.s3_client is not None:
await self.s3_client.get_data_files()
data_files = self.s3_client.data_files_local
else:
data_files = {
'clips': config.data_file,
'legistar': config.legistar.data_file,
'legistar_rguid': RGuidLegistarData._get_data_file(config),
}
self.data_files = data_files
self._load_app_models()
def _load_app_models(self) -> None:
data_files = self.data_files
self.app[ClipsKey] = ClipCollection.load(data_files['clips'])
self.app[LegistarDataKey] = LegistarData.load(data_files['legistar'])
self.app[RGuidLegistarDataKey] = RGuidLegistarData.load(data_files['legistar_rguid'])
async def update_loop(self):
if self.s3_client is None:
return
lck = self.app[DataFileLockKey]
while self._running:
await asyncio.sleep(self.update_timeout)
if not hasattr(self, 'data_files'):
continue
if self._closing:
break
logger.debug('Checking for updated data files')
changed = await self.s3_client.get_data_files()
if not changed:
continue
logger.info('Data files have changed. Reloading models')
async with lck:
logger.debug('data file lock acquired')
self._load_app_models()
logger.debug('data file lock released')
@logger.catch(reraise=True)
async def app_data_ctx(app: web.Application):
ctx = AppDataContext(app)
async with ctx:
await ctx.load_app_models()
yield
def build_app(app_conf: AppConfig, conf: Config|None = None) -> web.Application:
app = web.Application()
app[APP_CONF_KEY] = app_conf
if conf is None:
conf = Config.load(Config.default_filename)
app[ConfigKey] = conf
app[DataFileLockKey] = asyncio.Lock()
assert conf.local_timezone_name is not None
tz = set_local_timezone(conf.local_timezone_name)
app[TimezoneKey] = tz
app.cleanup_ctx.append(app_data_ctx)
app['use_s3'] = app_conf.use_s3
app['read_only'] = app_conf.read_only
app['site_name'] = app_conf.site_name
app[NavLinksKey] = app_conf.nav_links
aiohttp_jinja2.setup(
app,
loader=jinja2.PackageLoader(f'{__package__}', 'templates'),
context_processors=(
# request_processor,
aiohttp_jinja2.request_processor,
),
filters=[
('datetime_format', filters.datetime_format),
('date_format', filters.date_format),
('time_format', filters.time_format),
('duration_format', filters.duration_format),
('snake_case_to_title', filters.snake_case_to_title),
]
)
env = app[aiohttp_jinja2.APP_KEY]
env.globals.update({
'url_query': filters.url_query,
'static_path': filters.static_path,
'clip_url': filters.clip_url,
'legistar_url': filters.legistar_url,
})
app.add_routes(routes)
app.add_routes(views.routes)
static_routes = [
web.static('/assets', STATIC_ROOT),
]
if app_conf.serve_static and not app_conf.use_s3:
static_routes.extend([
web.static(f'/{conf.out_dir}', conf.out_dir_abs),
web.static(f'/{conf.legistar.out_dir}', conf.legistar.out_dir_abs),
web.static(f'/{RGuidLegistarData._get_root_dir(conf)}', RGuidLegistarData._get_root_dir(conf)),
])
app.add_routes(static_routes)
# app.add_routes([
# web.static('/assets', STATIC_ROOT),
# web.static(f'/{conf.out_dir}', conf.out_dir_abs),
# web.static(f'/{conf.legistar.out_dir}', conf.legistar.out_dir_abs),
# web.static(f'/{app[RGuidLegistarDataKey].root_dir}', app[RGuidLegistarDataKey].root_dir),
# ])
roots: StaticRoots = {
'assets': Path('assets'),
'granicus': conf.out_dir,
'legistar': conf.legistar.out_dir,
'legistar_rguid': RGuidLegistarData._get_root_dir(conf),
}
app[StaticRootsKey] = roots
def join_static(root: Path) -> URL:
return app_conf.static_url.joinpath(str(root))
root_urls: StaticUrlRoots = {
'assets': URL('/assets'),
# 'assets': join_static(roots['assets']),
'granicus': join_static(roots['granicus']),
'legistar': join_static(roots['legistar']),
'legistar_rguid': join_static(roots['legistar_rguid']),
}
app[StaticUrlRootsKey] = root_urls
return app
def init_func(argv):
app_conf = AppConfig()
return build_app(app_conf)
@click_group(
name='web',
params=extra_params,
)
@click.option(
'-c', '--config-file',
type=click.Path(file_okay=True, dir_okay=False, path_type=Path),
help='Config file to use. If provided (and the file exists), '\
'any other options are ignored. If provided and the file does not '\
'exist, the file will be created with the specified options.',
)
@click.option('-h', '--hostname', default='localhost', show_default=True)
@click.option('-p', '--port', default=8080, show_default=True)
@click.option(
'--sockfile',
type=click.Path(file_okay=True, dir_okay=False, path_type=Path),
help='Unix socket file to use for the server (if not specified, use TCP)'
)
@click.option('--serve-static/--no-serve-static', default=True, show_default=True)
@click.option('--read-only/--no-read-only', default=True, show_default=True)
@click.option('--static-url', default='/', show_default=True)
@click.option('--use-s3/--no-use-s3', default=False, show_default=True)
@click.option('--s3-data-dir', type=click.Path(file_okay=False, dir_okay=True, path_type=Path))
@click.pass_context
def cli(
ctx: click.Context,
config_file: Path|None,
hostname: str,
port: int,
sockfile: Path|None,
serve_static: bool,
read_only: bool,
static_url: str,
use_s3: bool,
s3_data_dir: Path|None
):
_static_url = URL(static_url)
if not _static_url.host:
_static_url = URL(f'http://{hostname}:{port}').join(_static_url)
assert ctx.parent is not None
parent_opts = ctx.parent.params
load_config_env = parent_opts['load_config_env']
config_env_file = parent_opts['config_env_file']
if load_config_env:
if config_env_file is not None:
click.echo(f'Loading environment variables from {config_env_file}')
load_dotenv(dotenv_path=config_env_file)
conf = AppConfig.load_from_env()
elif config_file is not None and config_file.exists():
conf = AppConfig.load(config_file)
else:
conf = AppConfig(
hostname=hostname, port=port, sockfile=sockfile, serve_static=serve_static,
read_only=read_only, static_url=_static_url,
use_s3=use_s3, s3_data_dir=s3_data_dir
)
if config_file is not None:
conf.save(config_file)
assert isinstance(ctx.parent.obj.config, Config)
ctx.obj = WebCliContext(parent=ctx.parent.obj, app_conf=conf)
@cli.command()
@click.argument(
'out_dir',
type=click.Path(file_okay=False, dir_okay=True, path_type=Path)
)
@click.option(
'--dry-run', is_flag=True,
)
@click.pass_obj
def collect_static(obj: WebCliContext, out_dir: Path, dry_run: bool):
"""Collect static files
"""
click.echo(f'Collect static files to {out_dir}')
def walk_dir(p):
for f in p.iterdir():
if f.is_dir():
yield from walk_dir(f)
else:
yield f
for f in walk_dir(STATIC_ROOT):
if not f.is_file():
continue
if f.suffix not in ('.css', '.js'):
continue
rel = f.relative_to(STATIC_ROOT)
dest = out_dir / rel
if not dest.parent.exists():
click.echo(f'mkdir {dest.parent}')
if not dry_run:
dest.parent.mkdir(parents=True, exist_ok=True)
click.echo(f'cp {f} {dest}')
if not dry_run:
dest.write_bytes(f.read_bytes())
@cli.command()
@click.option(
'--launch-browser/--no-launch-browser',
default=True,
show_default=True,
help='Launch a browser window after starting the server'
)
@click.pass_obj
def serve(obj: WebCliContext, launch_browser: bool):
"""Run the webapp and optionally launch a browser window
"""
parent_obj = obj.parent
app_conf = obj.app_conf
async def on_startup(app):
webbrowser.open(f'http://{app_conf.hostname}:{app_conf.port}')
app = build_app(app_conf, conf=parent_obj.config)
if launch_browser and app_conf.sockfile is None:
app.on_startup.append(on_startup)
if app_conf.sockfile is not None:
click.echo(f'Serving on {app_conf.sockfile}')
web.run_app(app, path=str(app_conf.sockfile), access_log_class=AccessLogger)
else:
click.echo(f'Serving on http://{app_conf.hostname}:{app_conf.port}')
web.run_app(app, host=app_conf.hostname, port=app_conf.port, access_log_class=AccessLogger)
if __name__ == '__main__':
cli() # type: ignore[call-arg]