Add Matrix client login/register commands

This commit is contained in:
Tulir Asokan 2019-06-08 17:02:44 +03:00
parent 523da95c17
commit d5e78db5cf
7 changed files with 88 additions and 20 deletions

View File

@ -31,6 +31,8 @@ def command(help: str) -> Callable[[Callable], Callable]:
@functools.wraps(func) @functools.wraps(func)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
for key, value in kwargs.items(): for key, value in kwargs.items():
if key not in questions:
continue
if value is not None and (questions[key]["type"] != "confirm" or value != "null"): if value is not None and (questions[key]["type"] != "confirm" or value != "null"):
questions.pop(key, None) questions.pop(key, None)
question_list = list(questions.values()) question_list = list(questions.values())
@ -49,6 +51,8 @@ def command(help: str) -> Callable[[Callable], Callable]:
def yesno(val: str) -> Optional[bool]: def yesno(val: str) -> Optional[bool]:
if not val: if not val:
return None return None
elif isinstance(val, bool):
return val
elif val.lower() in ("true", "t", "yes", "y"): elif val.lower() in ("true", "t", "yes", "y"):
return True return True
elif val.lower() in ("false", "f", "no", "n"): elif val.lower() in ("false", "f", "no", "n"):
@ -61,7 +65,7 @@ yesno.__name__ = "yes/no"
def option(short: str, long: str, message: str = None, help: str = None, def option(short: str, long: str, message: str = None, help: str = None,
click_type: Union[str, Callable[[str], Any]] = None, inq_type: str = None, click_type: Union[str, Callable[[str], Any]] = None, inq_type: str = None,
validator: Validator = None, required: bool = False, default: str = None, validator: Validator = None, required: bool = False, default: str = None,
is_flag: bool = False) -> Callable[[Callable], Callable]: is_flag: bool = False, prompt: bool = True) -> Callable[[Callable], Callable]:
if not message: if not message:
message = long[2].upper() + long[3:] message = long[2].upper() + long[3:]
click_type = validator.click_type if isinstance(validator, ClickValidator) else click_type click_type = validator.click_type if isinstance(validator, ClickValidator) else click_type
@ -70,6 +74,8 @@ def option(short: str, long: str, message: str = None, help: str = None,
def decorator(func) -> Callable: def decorator(func) -> Callable:
click.option(short, long, help=help, type=click_type)(func) click.option(short, long, help=help, type=click_type)(func)
if not prompt:
return func
if not hasattr(func, "__inquirer_questions__"): if not hasattr(func, "__inquirer_questions__"):
func.__inquirer_questions__ = {} func.__inquirer_questions__ = {}
q = { q = {

View File

@ -1 +1 @@
from . import upload, build, login, init, logs from . import upload, build, login, init, logs, auth

View File

@ -0,0 +1,65 @@
# maubot - A plugin-based Matrix bot system.
# Copyright (C) 2019 Tulir Asokan
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from urllib.request import urlopen, Request
from urllib.error import HTTPError
import json
from colorama import Fore
import click
from ..config import get_token
from ..cliq import cliq
history_count: int = 10
@cliq.command(help="Log into a Matrix account via the Maubot server")
@cliq.option("-h", "--homeserver", help="The homeserver to log into", required=True)
@cliq.option("-u", "--username", help="The username to log in with", required=True)
@cliq.option("-p", "--password", help="The password to log in with", inq_type="password",
required=True)
@cliq.option("-s", "--server", help="The maubot instance to log in through", default="",
required=False, prompt=False)
@click.option("-r", "--register", help="Register instead of logging in", is_flag=True,
default=False)
def auth(homeserver: str, username: str, password: str, server: str, register: bool) -> None:
server, token = get_token(server)
if not token:
return
endpoint = "register" if register else "login"
req = Request(f"{server}/_matrix/maubot/v1/client/auth/{homeserver}/{endpoint}",
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
data=json.dumps({
"username": username,
"password": password,
}).encode("utf-8"))
try:
with urlopen(req) as resp_data:
resp = json.load(resp_data)
action = "registered" if register else "logged in as"
print(f"{Fore.GREEN}Successfully {action} "
f"{Fore.CYAN}{resp['user_id']}{Fore.GREEN}.")
print(f"{Fore.GREEN}Access token: {Fore.CYAN}{resp['access_token']}{Fore.RESET}")
except HTTPError as e:
try:
err = json.load(e)
except json.JSONDecodeError:
err = {}
action = "register" if register else "log in"
print(f"{Fore.RED}Failed to {action}: {err.get('error', str(e))}{Fore.RESET}")

View File

@ -98,10 +98,7 @@ def write_plugin(meta: PluginMeta, output: Union[str, IO]) -> None:
def upload_plugin(output: Union[str, IO], server: str) -> None: def upload_plugin(output: Union[str, IO], server: str) -> None:
if not server: server, token = get_token(server)
server, token = get_default_server()
else:
token = get_token(server)
if not token: if not token:
return return
if isinstance(output, str): if isinstance(output, str):

View File

@ -21,7 +21,7 @@ from aiohttp import WSMsgType, WSMessage, ClientSession
from mautrix.client.api.types.util import Obj from mautrix.client.api.types.util import Obj
import click import click
from ..config import get_token, get_default_server from ..config import get_token
from ..base import app from ..base import app
history_count: int = 10 history_count: int = 10
@ -31,10 +31,7 @@ history_count: int = 10
@click.argument("server", required=False) @click.argument("server", required=False)
@click.option("-t", "--tail", default=10, help="Maximum number of old log lines to display") @click.option("-t", "--tail", default=10, help="Maximum number of old log lines to display")
def logs(server: str, tail: int) -> None: def logs(server: str, tail: int) -> None:
if not server: server, token = get_token(server)
server, token = get_default_server()
else:
token = get_token(server)
if not token: if not token:
return return
global history_count global history_count

View File

@ -33,10 +33,7 @@ class UploadError(Exception):
@click.argument("path") @click.argument("path")
@click.option("-s", "--server", help="The maubot instance to upload the plugin to") @click.option("-s", "--server", help="The maubot instance to upload the plugin to")
def upload(path: str, server: str) -> None: def upload(path: str, server: str) -> None:
if not server: server, token = get_token(server)
server, token = get_default_server()
else:
token = get_token(server)
if not token: if not token:
return return
with open(path, "rb") as file: with open(path, "rb") as file:

View File

@ -13,13 +13,13 @@
# #
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import Tuple, Optional from typing import Tuple, Optional, Dict, Any
import json import json
import os import os
from colorama import Fore from colorama import Fore
config = { config: Dict[str, Any] = {
"servers": {}, "servers": {},
"default_server": None, "default_server": None,
} }
@ -28,16 +28,22 @@ configdir = os.environ.get("XDG_CONFIG_HOME", os.path.join(os.environ.get("HOME"
def get_default_server() -> Tuple[Optional[str], Optional[str]]: def get_default_server() -> Tuple[Optional[str], Optional[str]]:
try: try:
server: str = config["default_server"] server: Optional[str] = config["default_server"]
except KeyError: except KeyError:
server = None server = None
if server is None: if server is None:
print(f"{Fore.RED}Default server not configured.{Fore.RESET}") print(f"{Fore.RED}Default server not configured.{Fore.RESET}")
return None, None return None, None
return server, get_token(server) return server, _get_token(server)
def get_token(server: str) -> Optional[str]: def get_token(server: str) -> Tuple[Optional[str], Optional[str]]:
if not server:
return get_default_server()
return server, _get_token(server)
def _get_token(server: str) -> Optional[str]:
try: try:
return config["servers"][server] return config["servers"][server]
except KeyError: except KeyError: