Merge pull request #234 from maubot/tulir/scheduler

Add basic scheduler for plugins
This commit is contained in:
Tulir Asokan 2024-03-07 16:42:48 +02:00 committed by GitHub
commit a62f064e1c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 178 additions and 11 deletions

View File

@ -6,16 +6,17 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v3
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
python-version: "3.12"
- uses: isort/isort-action@master
with:
sortPaths: "./maubot"
- uses: psf/black@stable
with:
src: "./maubot"
version: "24.2.0"
- name: pre-commit
run: |
pip install pre-commit

View File

@ -1,6 +1,6 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
rev: v4.5.0
hooks:
- id: trailing-whitespace
exclude_types: [markdown]
@ -8,13 +8,13 @@ repos:
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/psf/black
rev: 23.1.0
rev: 24.2.0
hooks:
- id: black
language_version: python3
files: ^maubot/.*\.pyi?$
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
rev: 5.13.2
hooks:
- id: isort
files: ^maubot/.*\.pyi?$

View File

@ -1,3 +1,3 @@
pre-commit>=2.10.1,<3
isort>=5.10.1,<6
black>=23,<24
black>=24,<25

View File

@ -56,9 +56,11 @@ async def get_table(request: web.Request) -> web.Response:
try:
order = [tuple(order.split(":")) for order in request.query.getall("order")]
order = [
(
(asc if sort.lower() == "asc" else desc)(table.columns[column])
if sort
else table.columns[column]
)
for column, sort in order
]
except KeyError:

View File

@ -27,6 +27,8 @@ from mautrix.util.async_db import Database, UpgradeTable
from mautrix.util.config import BaseProxyConfig
from mautrix.util.logging import TraceLogger
from .scheduler import BasicScheduler
if TYPE_CHECKING:
from .client import MaubotMatrixClient
from .loader import BasePluginLoader
@ -40,6 +42,7 @@ class Plugin(ABC):
log: TraceLogger
loop: AbstractEventLoop
loader: BasePluginLoader
sched: BasicScheduler
config: BaseProxyConfig | None
database: Engine | Database | None
webapp: PluginWebApp | None
@ -58,6 +61,7 @@ class Plugin(ABC):
webapp_url: str | None,
loader: BasePluginLoader,
) -> None:
self.sched = BasicScheduler(log=log.getChild("scheduler"))
self.client = client
self.loop = loop
self.http = http
@ -117,6 +121,7 @@ class Plugin(ABC):
self.client.remove_event_handler(event_type, func)
if self.webapp is not None:
self.webapp.clear()
self.sched.stop()
await self.stop()
async def stop(self) -> None:

159
maubot/scheduler.py Normal file
View File

@ -0,0 +1,159 @@
# maubot - A plugin-based Matrix bot system.
# Copyright (C) 2024 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import Awaitable, Callable
import asyncio
import logging
class BasicScheduler:
background_loop: asyncio.Task | None
tasks: set[asyncio.Task]
log: logging.Logger
def __init__(self, log: logging.Logger) -> None:
self.log = log
self.tasks = set()
def _find_caller(self) -> str:
try:
file_name, line_number, function_name, _ = self.log.findCaller()
return f"{function_name} at {file_name}:{line_number}"
except ValueError:
return "unknown function"
def run_periodically(
self,
period: float | int,
func: Callable[[], Awaitable],
run_task_in_background: bool = False,
catch_errors: bool = True,
) -> asyncio.Task:
"""
Run a function periodically in the background.
Args:
period: The period in seconds between each call to the function.
func: The function to run. No parameters will be provided,
use :meth:`functools.partial` if you need to pass parameters.
run_task_in_background: If ``True``, the function will be run in a background task.
If ``False`` (the default), the loop will wait for the task to return before
sleeping for the next period.
catch_errors: Whether the scheduler should catch and log any errors.
If ``False``, errors will be raised, and the caller must await the returned task
to find errors. This parameter has no effect if ``run_task_in_background``
is ``True``.
Returns:
The asyncio task object representing the background loop.
"""
task = asyncio.create_task(
self._call_periodically(
period,
func,
caller=self._find_caller(),
catch_errors=catch_errors,
run_task_in_background=run_task_in_background,
)
)
self._register_task(task)
return task
def run_later(
self, delay: float | int, coro: Awaitable, catch_errors: bool = True
) -> asyncio.Task:
"""
Run a coroutine after a delay.
Examples:
>>> self.sched.run_later(5, self.async_task(meow=True))
Args:
delay: The delay in seconds to await the coroutine after.
coro: The coroutine to await.
catch_errors: Whether the scheduler should catch and log any errors.
If ``False``, errors will be raised, and the caller must await the returned task
to find errors.
Returns:
The asyncio task object representing the scheduled task.
"""
task = asyncio.create_task(
self._call_with_delay(
delay, coro, caller=self._find_caller(), catch_errors=catch_errors
)
)
self._register_task(task)
return task
def _register_task(self, task: asyncio.Task) -> None:
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
async def _call_periodically(
self,
period: float | int,
func: Callable[[], Awaitable],
caller: str,
catch_errors: bool,
run_task_in_background: bool,
) -> None:
while True:
try:
await asyncio.sleep(period)
if run_task_in_background:
self._register_task(
asyncio.create_task(self._call_periodically_background(func(), caller))
)
else:
await func()
except asyncio.CancelledError:
raise
except Exception:
if catch_errors:
self.log.exception(f"Uncaught error in background loop (created in {caller})")
else:
raise
async def _call_periodically_background(self, coro: Awaitable, caller: str) -> None:
try:
await coro
except asyncio.CancelledError:
raise
except Exception:
self.log.exception(f"Uncaught error in background loop subtask (created in {caller})")
async def _call_with_delay(
self, delay: float | int, coro: Awaitable, caller: str, catch_errors: bool
) -> None:
try:
await asyncio.sleep(delay)
await coro
except asyncio.CancelledError:
raise
except Exception:
if catch_errors:
self.log.exception(f"Uncaught error in scheduled task (created in {caller})")
else:
raise
def stop(self) -> None:
"""
Stop all scheduled tasks and background loops.
"""
for task in self.tasks:
task.cancel(msg="Scheduler stopped")

View File

@ -9,5 +9,5 @@ skip = ["maubot/management/frontend"]
[tool.black]
line-length = 99
target-version = ["py38"]
target-version = ["py310"]
force-exclude = "maubot/management/frontend"