From 121330583a74832a93b4bdbfbee4d2ca7fd621fa Mon Sep 17 00:00:00 2001
From: 0xDEADCADE <69792955+0xDEADCADE@users.noreply.github.com>
Date: Fri, 8 Apr 2022 14:44:54 +0000
Subject: [PATCH] Add files via upload
---
main.py | 337 +++++++++++++++++++++++++++++++++++++++++++++++
requirements.txt | 2 +
2 files changed, 339 insertions(+)
create mode 100644 main.py
create mode 100644 requirements.txt
diff --git a/main.py b/main.py
new file mode 100644
index 0000000..6644e66
--- /dev/null
+++ b/main.py
@@ -0,0 +1,337 @@
+#!/usr/bin/env python3
+# Created by @deadcade:deadca.de
+# DEADCADE's Matrix Self Bot v1.2.0
+# builtins
+import asyncio # Required by nio
+import json # Settings and xkcd command
+import hashlib # File cache
+import os # Filesystem
+import mimetypes # Uploading files to homeserver
+import html # HTML formatting
+import requests # XKCD command
+import subprocess # ping command (admin)
+import random # roll command
+import datetime # Ratelimiting
+
+# External dependencies
+import nio # Matrix library, matirx-nio[e2e]
+from PIL import Image # Resizing images, pillow
+
+# Set the settings path
+settings_path = "./data/settings.json"
+# Open and read settings or exit
+if os.path.isfile(settings_path):
+ with open(settings_path) as settings_file:
+ settings = json.load(settings_file)
+else:
+ print("Please create a settings file at " + settings_path)
+ exit()
+
+# Basic function for loading data from disk or returning a default value
+def load_data(path, default={}):
+ if os.path.isfile(path):
+ try:
+ with open(path, "r") as f:
+ return json.load(f)
+ except json.decoder.JSONDecodeError:
+ return default
+
+# Cache for files sent to the homeserver, prevents reuploading existing files
+file_cache = load_data(settings["file_cache_path"])
+
+# Emojis
+emojis = load_data(settings["emojis_path"], {"default_size": 24})
+
+# Get text to replace when enclosed in semicolons
+text_replace = load_data(settings["text_replace_path"])
+
+# Credentials
+credentials = load_data(settings["credentials_path"])
+
+# Set up global variables
+client = None
+ratelimits = {}
+
+# Common function definitions
+# Grab a list of all emojis on disk
+def list_emojis():
+ global settings
+ emojislist = {}
+ for filename in os.listdir(settings["emojis_folder_path"]):
+ if "_resized" in filename:
+ continue
+ orig = filename
+ filename = settings["emojis_folder_path"] + filename
+ emojislist[orig.split(".")[0]] = filename
+ return emojislist
+
+# Downlaod an external file to disk
+def download_file(url, filename):
+ if os.path.exists(filename):
+ return filename
+ r = requests.get(url, stream=True, allow_redirects=True)
+ if r.status_code == 200:
+ with open(filename, 'wb') as f:
+ f.write(r.content)
+ return filename
+ else:
+ return False
+
+# Resize an image (squares only)
+def resize_image(filename, size):
+ im = Image.open(filename)
+ im = im.resize((size,size))
+ # Remove .ext from the filename, insert _resized into it, and save it
+ filename = filename[:-4] + "_resized" + filename[-4:]
+ im.save(filename)
+ return filename
+
+# Mentions a user
+def mention(user):
+ return f"[{user.split(':')[0][1:]}](https://matrix.to/#/{user})"
+
+# Send file to homeserver
+async def send_file(filename):
+ global client, settings, file_cache
+ with open(filename, "rb") as f:
+ hash = hashlib.sha512(f.read()).hexdigest()
+ if hash in file_cache.keys():
+ return file_cache[hash]
+
+ mime_type = mimetypes.guess_type(filename)
+ file_stat = os.stat(filename)
+ with open(filename, "r+b") as f:
+ resp, maybe_keys = await client.upload(f, content_type=mime_type[0], filename=os.path.basename(filename), filesize=file_stat.st_size)
+ if (isinstance(resp, nio.UploadResponse)):
+ file_cache[hash] = resp.content_uri
+ with open(settings["file_cache_path"], "w") as f:
+ json.dump(file_cache, f)
+ return resp.content_uri
+ else:
+ return ""
+
+# Send an image as a message to a room
+async def send_image(room_id, url, text):
+ global client
+ return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.image", "body": text, "url": url}, ignore_unverified_devices=True)
+
+# Edits a message (without HTML formatting)
+async def edit_message_unformatted(room_id, original_event, text):
+ global client
+ return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": "* " + text, "m.new_content": {"msgtype": "m.text", "body": text}, "m.relates_to": {'rel_type': 'm.replace', 'event_id': original_event.event_id}}, ignore_unverified_devices=True)
+
+# Edits a message (with HTML formatting)
+async def edit_message(room_id, original_event, text):
+ global client
+ unformatted, formatted = text, text
+ unformatted = "".join([part.split(">")[-1] for part in unformatted.split("<")])
+ unformatted = html.unescape(unformatted)
+ # Before replacing newlines in formatted, check if both are equal
+ if unformatted == formatted:
+ # This check is here to prevent sending "formatted" text that is exactly the same as unformatted text
+ return await edit_message_unformatted(room_id, original_event, text)
+ # \n doesn't work in HTML, replace it with
+ formatted = formatted.replace("\n", "
")
+ return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": "* " + unformatted, "format": "org.matrix.custom.html", "formatted_body": "* " + formatted, "m.new_content": {"msgtype": "m.text", "body": unformatted, "format": "org.matrix.custom.html", "formatted_body": formatted}, "m.relates_to": {'rel_type': 'm.replace', 'event_id': original_event.event_id}}, ignore_unverified_devices=True)
+
+# Send a message (with HTML formatting)
+async def send_text(room_id, text):
+ unformatted, formatted = text, text
+ unformatted = "".join([part.split(">")[-1] for part in unformatted.split("<")])
+ unformatted = html.unescape(unformatted)
+ # \n doesn't work in HTML, replace it with
+ formatted = formatted.replace("\n", "
")
+ return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": unformatted + " (SelfBot)", "format": "org.matrix.custom.html", "formatted_body": formatted + (f" (SelfBot)" if settings["source_url"] else " (SelfBot)")}, ignore_unverified_devices=True)
+
+# Commands definition
+# Appends shrug to the end of the message
+async def shrug(args, room, event):
+ if event.body == settings["prefix"] + "shrug":
+ new_message = "¯\\_(ツ)_/¯"
+ else:
+ new_message = event.body.replace(settings["prefix"] + "shrug ", "").replace(settings["prefix"] + "shrug", "") + " ¯\\_(ツ)_/¯"
+ return await edit_message(room.room_id, event, new_message)
+
+# Help command
+async def help(args, room, event):
+ if len(args) == 0:
+ # No command specified, send command list
+ source_text = f"\nSource Code" if settings["source_url"] else ""
+ return await send_text(room.room_id, settings["help_messages"]["help"].replace("{prefix}", settings["prefix"]) + source_text)
+ else:
+ help_command = args[0].lower().split(settings["prefix"])[-1]
+ if help_command in settings["help_messages"].keys():
+ return await send_text(room.room_id, "Usage:\n" + settings["help_messages"][help_command].replace("{prefix}", settings["prefix"]) + "\n")
+ else:
+ return await send_text(room.room_id, "Unknown command!")
+
+# Set default emoji size
+async def emoji_size(args, room, event):
+ global emojis
+ if len(args) == 1 and args[0].isdecimal():
+ emojis["default_size"] = args[0]
+ with open(settings["emojis_path"], "w") as f:
+ json.dump(emojis, f)
+ return await send_text(room.room_id, "Set default size to: " + args[0])
+ return await send_text(room.room_id, "Please provide the size argument")
+
+# Lists possible emojis
+async def emoji_list(args, room, event):
+ global emojis
+ emojislist = list_emojis()
+ message = f"Default emoji size: {emojis['default_size']}\nAvailable emojis:\n"
+ for emoji in emojislist:
+ message += emoji + ": :" + emoji + ":24:, "
+ message += "\n"
+ message = message[:-1] # Remove the last comma
+ return await send_text(room.room_id, message)
+
+# Ping the selfbot or an external host
+async def ping(args, room, event):
+ # If no external host is provided, or the user pinging is not the account itself
+ if len(args) == 0 or event.sender != client.user_id:
+ return await send_text(room.room_id, "Selfbot is online and accepting messages in " + room.display_name + ".")
+ else:
+ # If the user is allowed to ping, and a host is provided, ping
+ # Linux only, replace -c with -n (and possibly find -W alternative)
+ ping = subprocess.run(["ping", "-c", "1", "-W", "2", args[0]], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ successful = ping.returncode
+ message = "Ping to " + args[0] + " was " + ("successful" if not successful else "unsuccessful")
+ return await send_text(room.room_id, message)
+
+# Roll a dice!
+async def roll(args, room, event):
+ sides = 6 if len(args) == 0 or not args[0].isdecimal() else int(args[0])
+ out = random.randint(1, sides)
+ return await send_text(room.room_id, "You rolled a " + str(out) + "!")
+
+# Send an XKCD in chat (Date, number, title, and image)
+async def xkcd(args, room, event):
+ global settings
+ comic = ""
+ if len(args) == 1 and args[0].isdecimal():
+ comic = args[0] + "/"
+ try:
+ r = requests.get(f"https://xkcd.com/{comic}info.0.json")
+ rj = json.loads(r.text)
+ filename = download_file(rj["img"], settings["cache_path"] + "/" + str(rj["num"]) + "." + rj["img"].split(".")[-1])
+ image = await send_file(filename)
+ await send_text(room.room_id, f"{rj['year']}/{rj['month']}/{rj['day']}, {str(rj['num'])}: {rj['safe_title']}")
+ return await send_image(room.room_id, image, rj['alt'])
+ except Exception:
+ return await send_text(room.room_id, "Failed to get XKCD!")
+
+async def message_callback(room: nio.MatrixRoom, event: nio.RoomMessageText) -> None:
+ global client, settings, emojis
+ if settings["debug"]:
+ print(f"Message received in room {room.display_name}\n{event.sender} | {event.body}")
+ admin = False
+ if event.sender == client.user_id: # If this is our message
+ admin = True
+ if "m.new_content" in event.source["content"]:
+ return
+ # If the message is a command, process it as such
+ if event.body.lower().startswith(settings["prefix"]):
+ command = event.body.split(" ")[0][len(settings["prefix"]):].lower()
+ args = event.body.split(" ")[1:]
+ if event.sender in ratelimits and not admin:
+ if ratelimits[event.sender] > int(datetime.datetime.utcnow().timestamp()):
+ return
+ ratelimits[event.sender] = int(datetime.datetime.utcnow().timestamp()) + settings["ratelimit"]
+ if command in settings["command_list"] or (admin and command in settings["admin_command_list"]):
+ command_function = globals()[command]
+ await command_function(args, room, event)
+ return
+ if admin:
+ # If it is not a command, process regular message parsing.
+ new_body, orig_body = event.body, event.body
+ # Emoji processor
+ if event.body.count(":") > 1: # Reduce searching by a significant margin
+ # Get a list of emojis on disk
+ emojislist = list_emojis()
+ for emoji in emojislist.keys():
+ # If the emoji is in there (While because we're replacing one at a time for size support)
+ while ":" + emoji + ":" in new_body:
+ # Get the size
+ size = new_body[new_body.find(f":{emoji}:") + len(f":{emoji}:"):].split(":")[0]
+ sizeset = True
+ # Make sure emoji size ends with :, do not process size with :emoji:24
+ try:
+ third_colon = new_body[new_body.find(f":{emoji}:") + len(f":{emoji}:") + len(str(size)):][0]
+ except IndexError:
+ third_colon = ""
+ # If the size is processable
+ if (not size.isdecimal()) or third_colon != ":":
+ sizeset = False
+ size = str(emojis["default_size"])
+ # Prevent KeyError
+ if str(size) not in emojis.keys():
+ emojis[str(size)] = {}
+ # If this emoji is not uploaded yet for this size
+ if emoji not in emojis[str(size)].keys():
+ # Upload and index it
+ emojis[str(size)][emoji] = await send_file(resize_image(emojislist[emoji], int(size)))
+ with open(settings["emojis_path"], "w") as f:
+ json.dump(emojis, f)
+ # Replace the emoij with the respective file
+ new_body = new_body.replace(":" + emoji + ":" + ("" if not sizeset else f"{size}:"), f"", 1)
+ # Text replace processor
+ if event.body.count(";") > 1:
+ for to_replace in text_replace.keys():
+ if ";" + to_replace + ";" in new_body:
+ new_body = new_body.replace(";" + to_replace + ";", text_replace[to_replace])
+ # If anything was changed processing the message, edit it
+ if not new_body == orig_body:
+ await edit_message(room.room_id, event, new_body)
+
+async def decryption_failure_callback(room: nio.MatrixRoom, event: nio.MegolmEvent) -> None:
+ print(f"Failed to decrypt {event.event_id} in room {room.display_name}")
+ return
+
+async def main() -> None:
+ global client, settings, credentials
+ # Create client config
+ config = nio.AsyncClientConfig(store_sync_tokens=True, encryption_enabled=True)
+ # If there are no previously-saved credentials, we'll use the password
+ if not os.path.exists(credentials["session_path"]):
+ client = nio.AsyncClient(credentials["homeserver"], credentials["user"], config=config, store_path=credentials["store_path"])
+ resp = await client.login(credentials["password"], device_name=credentials["device_name"])
+ # check that we logged in succesfully
+ if (isinstance(resp, nio.LoginResponse)):
+ # open the config file in write-mode
+ with open(credentials["session_path"], "w") as f:
+ # write the login details to disk
+ json.dump({"homeserver": credentials["homeserver"], "user_id": resp.user_id, "device_id": resp.device_id, "access_token": resp.access_token},f)
+ else:
+ print(f"homeserver = \"{credentials['homeserver']}\"; user = \"{credentials['user']}\"")
+ print(f"Failed to log in: {resp}")
+ return
+ # Otherwise the config file exists, so we'll use the stored credentials
+ else:
+ # open the file in read-only mode
+ with open(credentials["session_path"], "r") as f:
+ session = json.load(f)
+ client = nio.AsyncClient(session['homeserver'], config=config, store_path=credentials["store_path"])
+ client.restore_login(user_id=session['user_id'], access_token=session['access_token'], device_id=session['device_id'])
+
+ credentials = {}
+ print(f"Logged in as: {client.user_id} with device {client.device_id}")
+ await client.sync(timeout=60000, full_state=True) # Ignore any messages sent before now.
+ client.add_event_callback(message_callback, nio.RoomMessageText)
+ client.add_event_callback(decryption_failure_callback, (nio.MegolmEvent,))
+ if client.should_upload_keys:
+ await client.keys_upload()
+ while True:
+ # If we're in debug mode, exit on error
+ if settings["debug"]:
+ await client.sync_forever(timeout=int(0xDEADCADE * 8.03012557499132e-06), full_state=True)
+ else:
+ # Else just print the error and continue
+ try:
+ await client.sync_forever(timeout=int(0xDEADCADE * 8.03012557499132e-06), full_state=True)
+ except Exception as e:
+ print(e)
+ await asyncio.sleep(int(0xDEADCADE * 2.676708524997107e-10))
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..9a118cf
--- /dev/null
+++ b/requirements.txt
@@ -0,0 +1,2 @@
+matrix-nio[e2e]
+pillow