47 lines
1.3 KiB
Python
47 lines
1.3 KiB
Python
from loguru import logger
|
|
from nio import AsyncClient, LoginResponse
|
|
|
|
|
|
class MatrixBot:
|
|
def __init__(self, config: dict):
|
|
# TODO: Test configuration for required settings
|
|
self.config = config
|
|
|
|
self.client = AsyncClient(
|
|
homeserver=self.config['homeserver'],
|
|
user=self.config['username']
|
|
)
|
|
|
|
async def send_message(self, message: str):
|
|
if not self.client.access_token:
|
|
logged_in = await self.login()
|
|
if not logged_in:
|
|
return
|
|
|
|
await self.client.room_send(
|
|
room_id=self.config['room_id'],
|
|
message_type="m.room.message",
|
|
content={
|
|
"msgtype": "m.text",
|
|
"body": message
|
|
}
|
|
)
|
|
logger.info("Message sent")
|
|
|
|
async def login(self):
|
|
response = await self.client.login(
|
|
password=self.config['password']
|
|
)
|
|
if isinstance(response, LoginResponse):
|
|
logger.info(f"Logged in as {self.config['username']}")
|
|
else:
|
|
logger.error(f"Failed to login as {self.config['username']}: {response}")
|
|
return False
|
|
|
|
return True
|
|
|
|
async def logout(self):
|
|
await self.client.logout()
|
|
await self.client.close()
|
|
logger.info(f"Logged out from {self.config['homeserver']}")
|