diff --git a/maubot/__main__.py b/maubot/__main__.py index e970c50..ccfceb8 100644 --- a/maubot/__main__.py +++ b/maubot/__main__.py @@ -26,7 +26,7 @@ from .server import MaubotServer from .client import Client, init as init_client_class from .loader.zip import init as init_zip_loader from .instance import init as init_plugin_instance_class -from .management.api import init as init_management_api +from .management.api import init as init_management_api, stop as stop_management_api from .__meta__ import __version__ parser = argparse.ArgumentParser(description="A plugin-based Matrix bot system.", @@ -87,6 +87,8 @@ except KeyboardInterrupt: loop.run_until_complete(asyncio.gather(*[client.stop() for client in Client.cache.values()], loop=loop)) db_session.commit() + log.debug("Closing websockets") + loop.run_until_complete(stop_management_api()) log.debug("Stopping server") loop.run_until_complete(server.stop()) log.debug("Closing event loop") diff --git a/maubot/management/api/__init__.py b/maubot/management/api/__init__.py index d8d1917..70f6e4b 100644 --- a/maubot/management/api/__init__.py +++ b/maubot/management/api/__init__.py @@ -17,16 +17,22 @@ from aiohttp import web from asyncio import AbstractEventLoop from ...config import Config -from .base import routes, set_config +from .base import routes, set_config, set_loop from .middleware import auth, error from .auth import web as _ from .plugin import web as _ from .instance import web as _ from .client import web as _ +from .log import stop_all as stop_log_sockets def init(cfg: Config, loop: AbstractEventLoop) -> web.Application: set_config(cfg) + set_loop(loop) app = web.Application(loop=loop, middlewares=[auth, error]) app.add_routes(routes) return app + + +async def stop() -> None: + await stop_log_sockets() diff --git a/maubot/management/api/base.py b/maubot/management/api/base.py index 4cf9636..c1e80e3 100644 --- a/maubot/management/api/base.py +++ b/maubot/management/api/base.py @@ -14,12 +14,14 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . from aiohttp import web +import asyncio from ...__meta__ import __version__ from ...config import Config routes: web.RouteTableDef = web.RouteTableDef() _config: Config = None +_loop: asyncio.AbstractEventLoop = None def set_config(config: Config) -> None: @@ -31,6 +33,15 @@ def get_config() -> Config: return _config +def set_loop(loop: asyncio.AbstractEventLoop) -> None: + global _loop + _loop = loop + + +def get_loop() -> asyncio.AbstractEventLoop: + return _loop + + @routes.get("/version") async def version(_: web.Request) -> web.Response: return web.json_response({ diff --git a/maubot/management/api/log.py b/maubot/management/api/log.py new file mode 100644 index 0000000..8700629 --- /dev/null +++ b/maubot/management/api/log.py @@ -0,0 +1,114 @@ +# maubot - A plugin-based Matrix bot system. +# Copyright (C) 2018 Tulir Asokan +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +from datetime import datetime +import logging +import asyncio + +from aiohttp import web + +from .base import routes, get_loop +from .auth import is_valid_token + +BUILTIN_ATTRS = {"args", "asctime", "created", "exc_info", "exc_text", "filename", "funcName", + "levelname", "levelno", "lineno", "module", "msecs", "message", "msg", "name", + "pathname", "process", "processName", "relativeCreated", "stack_info", "thread", + "threadName"} +INCLUDE_ATTRS = {"filename", "funcName", "levelname", "levelno", "lineno", "module", "name", + "pathname"} +EXCLUDE_ATTRS = BUILTIN_ATTRS - INCLUDE_ATTRS + + +class WebSocketHandler(logging.Handler): + def __init__(self, ws, level=logging.NOTSET) -> None: + super().__init__(level) + self.ws = ws + self.formatter = logging.Formatter() + + def emit(self, record: logging.LogRecord) -> None: + # JSON conversion based on Marsel Mavletkulov's json-log-formatter (MIT license) + # https://github.com/marselester/json-log-formatter + content = { + name: value + for name, value in record.__dict__.items() + if name not in EXCLUDE_ATTRS + } + content["msg"] = record.getMessage() + content["time"] = datetime.utcnow() + + if record.exc_info: + content["exc_info"] = self.formatter.formatException(record.exc_info) + + for name, value in content.items(): + if isinstance(value, datetime): + content[name] = value.astimezone().isoformat() + + asyncio.ensure_future(self.send(content), loop=get_loop()) + + async def send(self, record: dict) -> None: + try: + await self.ws.send_json(record) + except Exception as e: + pass + + +log_root = logging.getLogger("maubot") +log = logging.getLogger("maubot.server.websocket") +sockets = [] + + +async def stop_all() -> None: + for socket in sockets: + try: + await socket.close(1012) + except Exception: + pass + + +@routes.get("/logs") +async def log_websocket(request: web.Request) -> web.WebSocketResponse: + ws = web.WebSocketResponse() + await ws.prepare(request) + sockets.append(ws) + log.debug(f"Connection from {request.remote} opened") + handler = WebSocketHandler(ws) + authenticated = False + + async def close_if_not_authenticated(): + await asyncio.sleep(5, loop=get_loop()) + if not authenticated: + await ws.close(code=4000) + log.debug(f"Connection from {request.remote} terminated due to no authentication") + + asyncio.ensure_future(close_if_not_authenticated()) + + try: + async for msg in ws: + if msg.type != web.WSMsgType.TEXT: + continue + if is_valid_token(msg.data): + if not authenticated: + log.debug(f"Connection from {request.remote} authenticated") + log_root.addHandler(handler) + authenticated = True + await ws.send_json({"auth_success": True}) + elif not authenticated: + await ws.send_json({"auth_success": False}) + except Exception: + pass + log_root.removeHandler(handler) + log.debug(f"Connection from {request.remote} closed") + sockets.remove(ws) + return ws diff --git a/maubot/management/frontend/src/api.js b/maubot/management/frontend/src/api.js index 8d7d9b6..fc4e518 100644 --- a/maubot/management/frontend/src/api.js +++ b/maubot/management/frontend/src/api.js @@ -75,6 +75,59 @@ export async function ping() { throw json } +export async function openLogSocket() { + let protocol = window.location.protocol === "https:" ? "wss:" : "ws:" + const url = `${protocol}//${window.location.host}${BASE_PATH}/logs` + const wrapper = { + socket: null, + connected: false, + authenticated: false, + fails: -1, + } + const openHandler = () => { + wrapper.socket.send(localStorage.accessToken) + wrapper.connected = true + } + const messageHandler = evt => { + // TODO use logs + const data = JSON.parse(evt.data) + if (data.auth_success !== undefined) { + if (data.auth_success) { + console.info("Websocket connection authentication successful") + wrapper.authenticated = true + wrapper.fails = -1 + } else { + console.info("Websocket connection authentication failed") + } + } else { + console.log("SERVLOG", data) + } + } + const closeHandler = evt => { + if (evt) { + if (evt.code === 4000) { + console.error("Websocket connection failed: access token invalid or not provided") + } else if (evt.code === 1012) { + console.info("Websocket connection closed: server is restarting") + } + } + wrapper.connected = false + wrapper.socket = null + wrapper.fails++ + const SECOND = 1000 + setTimeout(() => { + wrapper.socket = new WebSocket(url) + wrapper.socket.onopen = openHandler + wrapper.socket.onmessage = messageHandler + wrapper.socket.onclose = closeHandler + }, Math.min(wrapper.fails * 5 * SECOND, 30 * SECOND)) + } + + closeHandler() + + return wrapper +} + export const getInstances = () => defaultGet("/instances") export const getInstance = id => defaultGet(`/instance/${id}`) export const putInstance = (instance, id) => defaultPut("instance", instance, id) @@ -123,7 +176,7 @@ export const deleteClient = id => defaultDelete("client", id) export default { BASE_PATH, - login, ping, + login, ping, openLogSocket, getInstances, getInstance, putInstance, deleteInstance, getPlugins, getPlugin, uploadPlugin, deletePlugin, getClients, getClient, uploadAvatar, getAvatarURL, putClient, deleteClient, diff --git a/maubot/management/frontend/src/pages/dashboard/index.js b/maubot/management/frontend/src/pages/dashboard/index.js index 567c443..26b70fa 100644 --- a/maubot/management/frontend/src/pages/dashboard/index.js +++ b/maubot/management/frontend/src/pages/dashboard/index.js @@ -55,6 +55,9 @@ class Dashboard extends Component { plugins[plugin.id] = plugin } this.setState({ instances, clients, plugins }) + const logs = await api.openLogSocket() + console.log("WebSocket opened:", logs) + window.logs = logs } renderList(field, type) {