Add log websocket

This commit is contained in:
Tulir Asokan 2018-11-11 19:27:09 +02:00
parent b52766ab1e
commit 303a9827a1
6 changed files with 192 additions and 3 deletions

View File

@ -26,7 +26,7 @@ from .server import MaubotServer
from .client import Client, init as init_client_class from .client import Client, init as init_client_class
from .loader.zip import init as init_zip_loader from .loader.zip import init as init_zip_loader
from .instance import init as init_plugin_instance_class from .instance import init as init_plugin_instance_class
from .management.api import init as init_management_api from .management.api import init as init_management_api, stop as stop_management_api
from .__meta__ import __version__ from .__meta__ import __version__
parser = argparse.ArgumentParser(description="A plugin-based Matrix bot system.", parser = argparse.ArgumentParser(description="A plugin-based Matrix bot system.",
@ -87,6 +87,8 @@ except KeyboardInterrupt:
loop.run_until_complete(asyncio.gather(*[client.stop() for client in Client.cache.values()], loop.run_until_complete(asyncio.gather(*[client.stop() for client in Client.cache.values()],
loop=loop)) loop=loop))
db_session.commit() db_session.commit()
log.debug("Closing websockets")
loop.run_until_complete(stop_management_api())
log.debug("Stopping server") log.debug("Stopping server")
loop.run_until_complete(server.stop()) loop.run_until_complete(server.stop())
log.debug("Closing event loop") log.debug("Closing event loop")

View File

@ -17,16 +17,22 @@ from aiohttp import web
from asyncio import AbstractEventLoop from asyncio import AbstractEventLoop
from ...config import Config from ...config import Config
from .base import routes, set_config from .base import routes, set_config, set_loop
from .middleware import auth, error from .middleware import auth, error
from .auth import web as _ from .auth import web as _
from .plugin import web as _ from .plugin import web as _
from .instance import web as _ from .instance import web as _
from .client import web as _ from .client import web as _
from .log import stop_all as stop_log_sockets
def init(cfg: Config, loop: AbstractEventLoop) -> web.Application: def init(cfg: Config, loop: AbstractEventLoop) -> web.Application:
set_config(cfg) set_config(cfg)
set_loop(loop)
app = web.Application(loop=loop, middlewares=[auth, error]) app = web.Application(loop=loop, middlewares=[auth, error])
app.add_routes(routes) app.add_routes(routes)
return app return app
async def stop() -> None:
await stop_log_sockets()

View File

@ -14,12 +14,14 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from aiohttp import web from aiohttp import web
import asyncio
from ...__meta__ import __version__ from ...__meta__ import __version__
from ...config import Config from ...config import Config
routes: web.RouteTableDef = web.RouteTableDef() routes: web.RouteTableDef = web.RouteTableDef()
_config: Config = None _config: Config = None
_loop: asyncio.AbstractEventLoop = None
def set_config(config: Config) -> None: def set_config(config: Config) -> None:
@ -31,6 +33,15 @@ def get_config() -> Config:
return _config return _config
def set_loop(loop: asyncio.AbstractEventLoop) -> None:
global _loop
_loop = loop
def get_loop() -> asyncio.AbstractEventLoop:
return _loop
@routes.get("/version") @routes.get("/version")
async def version(_: web.Request) -> web.Response: async def version(_: web.Request) -> web.Response:
return web.json_response({ return web.json_response({

View File

@ -0,0 +1,114 @@
# maubot - A plugin-based Matrix bot system.
# Copyright (C) 2018 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from datetime import datetime
import logging
import asyncio
from aiohttp import web
from .base import routes, get_loop
from .auth import is_valid_token
BUILTIN_ATTRS = {"args", "asctime", "created", "exc_info", "exc_text", "filename", "funcName",
"levelname", "levelno", "lineno", "module", "msecs", "message", "msg", "name",
"pathname", "process", "processName", "relativeCreated", "stack_info", "thread",
"threadName"}
INCLUDE_ATTRS = {"filename", "funcName", "levelname", "levelno", "lineno", "module", "name",
"pathname"}
EXCLUDE_ATTRS = BUILTIN_ATTRS - INCLUDE_ATTRS
class WebSocketHandler(logging.Handler):
def __init__(self, ws, level=logging.NOTSET) -> None:
super().__init__(level)
self.ws = ws
self.formatter = logging.Formatter()
def emit(self, record: logging.LogRecord) -> None:
# JSON conversion based on Marsel Mavletkulov's json-log-formatter (MIT license)
# https://github.com/marselester/json-log-formatter
content = {
name: value
for name, value in record.__dict__.items()
if name not in EXCLUDE_ATTRS
}
content["msg"] = record.getMessage()
content["time"] = datetime.utcnow()
if record.exc_info:
content["exc_info"] = self.formatter.formatException(record.exc_info)
for name, value in content.items():
if isinstance(value, datetime):
content[name] = value.astimezone().isoformat()
asyncio.ensure_future(self.send(content), loop=get_loop())
async def send(self, record: dict) -> None:
try:
await self.ws.send_json(record)
except Exception as e:
pass
log_root = logging.getLogger("maubot")
log = logging.getLogger("maubot.server.websocket")
sockets = []
async def stop_all() -> None:
for socket in sockets:
try:
await socket.close(1012)
except Exception:
pass
@routes.get("/logs")
async def log_websocket(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
sockets.append(ws)
log.debug(f"Connection from {request.remote} opened")
handler = WebSocketHandler(ws)
authenticated = False
async def close_if_not_authenticated():
await asyncio.sleep(5, loop=get_loop())
if not authenticated:
await ws.close(code=4000)
log.debug(f"Connection from {request.remote} terminated due to no authentication")
asyncio.ensure_future(close_if_not_authenticated())
try:
async for msg in ws:
if msg.type != web.WSMsgType.TEXT:
continue
if is_valid_token(msg.data):
if not authenticated:
log.debug(f"Connection from {request.remote} authenticated")
log_root.addHandler(handler)
authenticated = True
await ws.send_json({"auth_success": True})
elif not authenticated:
await ws.send_json({"auth_success": False})
except Exception:
pass
log_root.removeHandler(handler)
log.debug(f"Connection from {request.remote} closed")
sockets.remove(ws)
return ws

View File

@ -75,6 +75,59 @@ export async function ping() {
throw json throw json
} }
export async function openLogSocket() {
let protocol = window.location.protocol === "https:" ? "wss:" : "ws:"
const url = `${protocol}//${window.location.host}${BASE_PATH}/logs`
const wrapper = {
socket: null,
connected: false,
authenticated: false,
fails: -1,
}
const openHandler = () => {
wrapper.socket.send(localStorage.accessToken)
wrapper.connected = true
}
const messageHandler = evt => {
// TODO use logs
const data = JSON.parse(evt.data)
if (data.auth_success !== undefined) {
if (data.auth_success) {
console.info("Websocket connection authentication successful")
wrapper.authenticated = true
wrapper.fails = -1
} else {
console.info("Websocket connection authentication failed")
}
} else {
console.log("SERVLOG", data)
}
}
const closeHandler = evt => {
if (evt) {
if (evt.code === 4000) {
console.error("Websocket connection failed: access token invalid or not provided")
} else if (evt.code === 1012) {
console.info("Websocket connection closed: server is restarting")
}
}
wrapper.connected = false
wrapper.socket = null
wrapper.fails++
const SECOND = 1000
setTimeout(() => {
wrapper.socket = new WebSocket(url)
wrapper.socket.onopen = openHandler
wrapper.socket.onmessage = messageHandler
wrapper.socket.onclose = closeHandler
}, Math.min(wrapper.fails * 5 * SECOND, 30 * SECOND))
}
closeHandler()
return wrapper
}
export const getInstances = () => defaultGet("/instances") export const getInstances = () => defaultGet("/instances")
export const getInstance = id => defaultGet(`/instance/${id}`) export const getInstance = id => defaultGet(`/instance/${id}`)
export const putInstance = (instance, id) => defaultPut("instance", instance, id) export const putInstance = (instance, id) => defaultPut("instance", instance, id)
@ -123,7 +176,7 @@ export const deleteClient = id => defaultDelete("client", id)
export default { export default {
BASE_PATH, BASE_PATH,
login, ping, login, ping, openLogSocket,
getInstances, getInstance, putInstance, deleteInstance, getInstances, getInstance, putInstance, deleteInstance,
getPlugins, getPlugin, uploadPlugin, deletePlugin, getPlugins, getPlugin, uploadPlugin, deletePlugin,
getClients, getClient, uploadAvatar, getAvatarURL, putClient, deleteClient, getClients, getClient, uploadAvatar, getAvatarURL, putClient, deleteClient,

View File

@ -55,6 +55,9 @@ class Dashboard extends Component {
plugins[plugin.id] = plugin plugins[plugin.id] = plugin
} }
this.setState({ instances, clients, plugins }) this.setState({ instances, clients, plugins })
const logs = await api.openLogSocket()
console.log("WebSocket opened:", logs)
window.logs = logs
} }
renderList(field, type) { renderList(field, type) {