#!/usr/bin/env python3 # Created by @deadcade:deadca.de # DEADCADE's Matrix Self Bot v1.2.0 # builtins import asyncio # Required by nio import json # Settings and xkcd command import hashlib # File cache import os # Filesystem import mimetypes # Uploading files to homeserver import html # HTML formatting import requests # XKCD command import difflib # XKCD Title lookup import subprocess # GIF Emojis, ping command (admin) import random # roll command, http cat without args import datetime # Ratelimiting, room member cache invalidation # External dependencies import nio # Matrix library, matirx-nio[e2e] from PIL import Image # Resizing images, pillow try: from libretranslatepy import LibreTranslateAPI # Translation functionality translate_available = True except Exception: translate_available = False # Set the settings path settings_path = "./data/settings.json" # Open and read settings or exit if os.path.isfile(settings_path): with open(settings_path) as settings_file: settings = json.load(settings_file) else: print("Please create a settings file at " + settings_path) exit() # Basic function for loading data from disk or returning a default value def load_data(path, default={}): if os.path.isfile(path): try: with open(path, "r") as f: return json.load(f) except json.decoder.JSONDecodeError: return default else: return default # Cache for files sent to the homeserver, prevents reuploading existing files file_cache = load_data(settings["file_cache_path"]) # Emojis emojis = load_data(settings["emojis_path"], {"default_size": 24}) # Get text to replace when enclosed in semicolons text_replace = load_data(settings["text_replace_path"]) # Credentials credentials = load_data(settings["credentials_path"]) # Set up global variables client = None ratelimits = {} room_member_cache = {} # Common function definitions # Filter a title for local lookup with XKCD def filter_xkcd_title(title): filtered_title = "" for char in title.lower().split("(")[0]: if (char.isdecimal() or char.isalpha()) and (not char == " "): filtered_title += char return filtered_title # Takes in a country (US, CA, AU), outputs the "flag emoji"/regional indicator emojis def flag_emoji(country): regional_indicator_offset = 127397 return chr(ord(country.upper()[0]) + regional_indicator_offset) + chr(ord(country.upper()[1]) + regional_indicator_offset) # Takes in a flag emoji, outputs as text instead of regional indicators def flag_decode(emoji): regional_indicator_offset = 127397 try: return chr(ord(emoji[0]) - regional_indicator_offset) + chr(ord(emoji[1]) - regional_indicator_offset) except Exception: return None # Grab a list of all emojis on disk def list_emojis(): global settings emojislist = {} for filename in os.listdir(settings["emojis_folder_path"]): if "_resized" in filename: continue orig = filename filename = settings["emojis_folder_path"] + filename emojislist[orig.split(".")[0]] = filename return emojislist # Downlaod an external file to disk def download_file(url, filename): if os.path.exists(filename): return filename r = requests.get(url, stream=True, allow_redirects=True) if r.status_code == 200: with open(filename, 'wb') as f: f.write(r.content) return filename else: return False # GIF resizing def thumbnails(frames, size): for frame in frames: thumbnail = frame.copy() thumbnail.thumbnail((size, size), Image.ANTIALIAS) yield thumbnail # Resize an image (squares only) def resize_image(filename, size): original_filename = filename filename = filename[:-4] + "_resized" + filename[-4:] if filename.endswith(".gif"): # PIL does not properly hanle GIF images subprocess.run(["gifsicle", "--resize", f"{str(size)}x{str(size)}", original_filename, "-o", filename], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) else: im = Image.open(original_filename) im = im.resize((size,size)) im.save(filename) return filename # Mentions a user def mention(user): return f"{user.split(':')[0][1:]}" # Send file to homeserver async def send_file(filename): global client, settings, file_cache with open(filename, "rb") as f: hash = hashlib.sha512(f.read()).hexdigest() if hash in file_cache.keys(): return file_cache[hash] mime_type = mimetypes.guess_type(filename) file_stat = os.stat(filename) with open(filename, "r+b") as f: resp, maybe_keys = await client.upload(f, content_type=mime_type[0], filename=os.path.basename(filename), filesize=file_stat.st_size) if (isinstance(resp, nio.UploadResponse)): file_cache[hash] = resp.content_uri with open(settings["file_cache_path"], "w") as f: json.dump(file_cache, f) return resp.content_uri else: return "" # Send an image as a message to a room async def send_image(room_id, url, text): global client return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.image", "body": text, "url": url}, ignore_unverified_devices=True) # Edits a message (without HTML formatting) async def edit_message_unformatted(room_id, original_event, text): global client return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": "* " + text, "m.new_content": {"msgtype": "m.text", "body": text}, "m.relates_to": {'rel_type': 'm.replace', 'event_id': original_event.event_id}}, ignore_unverified_devices=True) # Edits a message (with HTML formatting) async def edit_message(room_id, original_event, text): global client text = text.split("")[-1] unformatted, formatted = text, text unformatted = "".join([part.split(">")[-1] for part in unformatted.split("<")]) unformatted = html.unescape(unformatted) # Before replacing newlines in formatted, check if both are equal if unformatted == formatted: # This check is here to prevent sending "formatted" text that is exactly the same as unformatted text return await edit_message_unformatted(room_id, original_event, text) # \n doesn't work in HTML, replace it with
formatted = formatted.replace("\n", "
") return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": "* " + unformatted, "format": "org.matrix.custom.html", "formatted_body": "* " + formatted, "m.new_content": {"msgtype": "m.text", "body": unformatted, "format": "org.matrix.custom.html", "formatted_body": formatted}, "m.relates_to": {'rel_type': 'm.replace', 'event_id': original_event.event_id}}, ignore_unverified_devices=True) # Send a message (with HTML formatting) async def send_text(room_id, text): unformatted, formatted = text, text unformatted = "".join([part.split(">")[-1] for part in unformatted.split("<")]) unformatted = html.unescape(unformatted) # \n doesn't work in HTML, replace it with
formatted = formatted.replace("\n", "
") return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": unformatted + " (SelfBot)", "format": "org.matrix.custom.html", "formatted_body": formatted + (f" (SelfBot)" if settings["source_url"] else " (SelfBot)")}, ignore_unverified_devices=True) # Send a reply (with HTML formatting) async def send_reply(room_id, original_event, text): unformatted, formatted = text, text unformatted = "".join([part.split(">")[-1] for part in unformatted.split("<")]) unformatted = html.unescape(unformatted) # \n doesn't work in HTML, replace it with
formatted = formatted.replace("\n", "
") originalbody = original_event.formatted_body if original_event.formatted_body else original_event.body formatted = f"
In reply to {original_event.sender}
{originalbody}
{formatted}" quotedbody = original_event.body.replace('\n', '\n> ') unformatted = f"> <{original_event.sender}> {quotedbody}\n\n{unformatted}" return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": unformatted + " (SelfBot)", "format": "org.matrix.custom.html", "formatted_body": formatted + (f" (SelfBot)" if settings["source_url"] else " (SelfBot)"), "m.relates_to": {"m.in_reply_to": {"event_id": original_event.event_id}}}, ignore_unverified_devices=True) # Commands definition # Appends shrug to the end of the message async def shrug(args, room, event): if event.body == settings["prefix"] + "shrug": new_message = "¯\\_(ツ)_/¯" else: new_message = event.body.replace(settings["prefix"] + "shrug ", "").replace(settings["prefix"] + "shrug", "") + " ¯\\_(ツ)_/¯" return await edit_message(room.room_id, event, new_message) # Help command async def help(args, room, event): if len(args) == 0: # No command specified, send command list source_text = f"\nSource Code" if settings["source_url"] else "" helptext = "Admin Commands:\n" for command in settings["admin_command_list"]: helptext += settings["help_messages"][command] + "\n" helptext += "\nUser Commands:\n" for command in settings["command_list"]: helptext += settings["help_messages"][command] + "\n" helptext += "\n\nArguments: (optional) [required] {admin only, optional}" return await send_text(room.room_id, helptext.replace("{prefix}", settings["prefix"]) + source_text) else: help_command = args[0].lower().split(settings["prefix"])[-1] if help_command in settings["help_messages"].keys(): return await send_text(room.room_id, "Usage:\n" + settings["help_messages"][help_command].replace("{prefix}", settings["prefix"]) + "\n") else: return await send_text(room.room_id, "Unknown command!") # Set default emoji size async def emoji_size(args, room, event): global emojis if len(args) == 1 and args[0].isdecimal(): emojis["default_size"] = args[0] with open(settings["emojis_path"], "w") as f: json.dump(emojis, f) return await send_text(room.room_id, "Set default size to: " + args[0]) return await send_text(room.room_id, "Please provide the size argument") # Lists possible emojis async def emoji_list(args, room, event): global emojis emojislist = list_emojis() message = f"Default emoji size: {emojis['default_size']}\nAvailable emojis:\n" for emoji in emojislist: message += emoji + ": :" + emoji + ":24:, " message += "\n" message = message[:-1] # Remove the last comma return await send_text(room.room_id, message) # Ping the selfbot or an external host async def ping(args, room, event): # If no external host is provided, or the user pinging is not the account itself if len(args) == 0 or event.sender != client.user_id: return await send_text(room.room_id, "Selfbot is online and accepting messages in " + room.display_name + ".") else: # If the user is allowed to ping, and a host is provided, ping # Linux only, replace -c with -n (and possibly find -W alternative) ping = subprocess.run(["ping", "-c", "1", "-W", "2", args[0]], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) successful = ping.returncode message = "Ping to " + args[0] + " was " + ("successful" if not successful else "unsuccessful") return await send_text(room.room_id, message) # Roll a dice! async def roll(args, room, event): sides = 6 if len(args) == 0 or not args[0].isdecimal() else int(args[0]) out = random.randint(1, sides) return await send_text(room.room_id, "You rolled a " + str(out) + "!") # Send an XKCD in chat (Date, number, title, and image) async def xkcd(args, room, event): global settings comic = "" if len(args) == 1 and args[0].isdecimal(): comic = args[0] + "/" elif len(args) > 0: lookup = {} r = requests.get("https://xkcd.com/archive/") for line in r.text.split("\n"): if "")[1].split("<")[0]) lookup[title] = num user_title = filter_xkcd_title(" ".join(args)) title = difflib.get_close_matches(user_title, lookup.keys(), n=1) if len(title) > 0: comic = lookup[title[0]] + "/" else: return await send_text(room.room_id, "Could not find XKCD!") r = requests.get(f"https://xkcd.com/{comic}info.0.json") if settings["debug"]: rj = json.loads(r.text) else: try: rj = json.loads(r.text) except Exception: return await send_text(room.room_id, "Failed to get XKCD!") filename = download_file(rj["img"], settings["cache_path"] + "/" + str(rj["num"]) + "." + rj["img"].split(".")[-1]) image = await send_file(filename) await send_text(room.room_id, f"{rj['year']}/{rj['month']}/{rj['day']}, {str(rj['num'])}: {rj['safe_title']}") return await send_image(room.room_id, image, rj['alt']) async def httpcat(args, room, event): global settings if len(args) == 1 and args[0].isdecimal(): code = args[0] elif len(args) == 0: code = random.choice([100, 101, 102, 200, 201, 202, 203, 204, 206, 207, 300, 301, 302, 303, 304, 305, 307, 308, 400, 401, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 420, 421, 422, 423, 424, 425, 426, 429, 431, 444, 450, 451, 498, 499, 500, 501, 502, 503, 504, 506, 507, 508, 509, 510, 511, 521, 522, 523, 525, 599]) r = requests.get(f"https://http.cat/{str(code)}.jpg") filename = download_file(f"https://http.cat/{str(code)}.jpg", settings["cache_path"] + "/httpcat-" + str(code) + ".jpg") if filename: image = await send_file(filename) return await send_image(room.room_id, image, str(code)) else: return await send_text(room.room_id, "Failed to get HTTP Cat!") async def aw(args, room, event): if len(args) > 0: search = "+".join(args) r = requests.get("https://wiki.archlinux.org/index.php?search=" + search) if r.status_code == 200: return await send_text(room.room_id, r.url) else: return await send_text(room.room_id, "https://wiki.archlinux.org/index.php?search=" + search) else: return await send_text(room.room_id, "Please provide a search term.") async def mutual_rooms(args, room, event): global client, room_member_cache # Custom arg parser, because we need to use formatted_body for mentions from modern clients if len(args) > 0: args = event.formatted_body.split(" ")[1:] if event.formatted_body else event.body.split(" ")[1:] user = "" for arg in args: if arg.startswith("href"): # Parse user from a matrix.to link try: user = "@" + arg.replace("%40", "@").replace("%3A", ":").split("@")[1].split('"')[0] except Exception: pass break elif arg.startswith("@"): # User is direct text user = arg break if not user: return await send_text(room.room_id, "Error parsing user MXID") # Create a list of mutual rooms # Determine if the cache is usable usecache = "date" in room_member_cache and room_member_cache["date"] > int(datetime.datetime.utcnow().timestamp()) mutual = [] # Get list of rooms the bot user is in joined_rooms = await client.joined_rooms() # If cache is invalid if not usecache: # Reset the cache timestamp room_member_cache["date"] = int(datetime.datetime.utcnow().timestamp()) + 1800 # Check every room for lroom in joined_rooms.rooms: members = [] # If there's a valid cache entry if usecache and lroom in room_member_cache: # Use cache members = room_member_cache[lroom] else: # Grab room member list from HS and store it in cache members = await client.joined_members(lroom) members = members.members room_member_cache[lroom] = members # Check if user is in room if user in [x.user_id for x in members]: mutual.append(lroom) # Send room list return await send_text(room.room_id, "List of mutual rooms with " + mention(user) + ":\n" + "\n".join([f"{x}" for x in mutual]) + "\n") async def unknown_callback(room: nio.MatrixRoom, event: nio.UnknownEvent) -> None: if event.type == "m.reaction": # Get the ID of the event this was a reaction to relation_dict = event.source.get("content", {}).get("m.relates_to", {}) reacted_to = relation_dict.get("event_id") if reacted_to and relation_dict.get("rel_type") == "m.annotation": await reaction_callback(room, event, reacted_to) return async def reaction_callback(room: nio.MatrixRoom, event: nio.UnknownEvent, reacted_to_id: str) -> None: global client, settings, translate_available if settings["debug"]: print(f"Got reaction in {room.room_id} from {event.sender}.") # No selfbot in mautrix-discord rooms. if " (Discord)" in room.display_name: return # Only acknowledge reactions that we sent if event.sender != client.user_id: return # Get the original event that was reacted to event_response = await client.room_get_event(room.room_id, reacted_to_id) if isinstance(event_response, nio.RoomGetEventError): return reacted_to_event = event_response.event # Get the reaction reaction_content = (event.source.get("content", {}).get("m.relates_to", {}).get("key")) if translate_available and len(reaction_content) == 2: if flag_decode(reaction_content): country = flag_decode(reaction_content).lower() if country == "us" or country == "gb": # Translate from auto to English from_lang = "auto" else: # Translate from lang to English from_lang = country lt = LibreTranslateAPI(settings["libretranslate_api"], settings["libretranslate_api_key"] if "libretranslate_api_key" in settings.keys() and settings["libretranslate_api_key"] else None) # If the language is not supported try: if from_lang not in [x["code"] for x in lt.languages()] and from_lang != "auto": return await send_text(room.room_id, "Language " + from_lang + " is not supported!") except Exception: return await send_text(room.room_id, "Something went wrong connecting to the LibreTranslate server!") original_body = reacted_to_event.body # If the message was a reply if reacted_to_event.source.get("content", {}).get("m.relates_to", {}).get("m.in_reply_to", {}).get("event_id"): original_body = "\n\n".join(reacted_to_event.body.split("\n\n")[1:]) # Remove the in reply to part try: translated = lt.translate(original_body, from_lang, "en") except Exception: return await send_text(room.room_id, "Something went wrong translating the message!") lang = lt.detect(original_body)[0]["language"].upper() if from_lang == "auto" else from_lang.upper() return await send_reply(room.room_id, reacted_to_event, "Translated from " + lang + " (" + flag_emoji("US" if lang == "EN" else lang) + "):\n" + translated + "\n") async def message_callback(room: nio.MatrixRoom, event: nio.RoomMessageText) -> None: global client, settings, emojis if settings["debug"]: print(f"Message received in room {room.display_name}\n{event.sender} | {event.body}") # Discord messages need different treatment. Absolutely no commands, and if possible no instant message edits. # Because selfbots are against ToS on Discord, and will get banned within just a few hours to days of usage. if " (Discord)" in room.display_name: return admin = False if event.sender == client.user_id: # If this is our message admin = True if "m.new_content" in event.source["content"]: return # If the message is a command, process it as such if event.body.lower().startswith(settings["prefix"]): command = event.body.split(" ")[0][len(settings["prefix"]):].lower() args = event.body.split(" ")[1:] if event.sender in ratelimits and not admin: if ratelimits[event.sender] > int(datetime.datetime.utcnow().timestamp()): return ratelimits[event.sender] = int(datetime.datetime.utcnow().timestamp()) + settings["ratelimit"] if command in settings["command_list"] or (admin and command in settings["admin_command_list"]): command_function = globals()[command] if settings["debug"]: await command_function(args, room, event) else: try: await command_function(args, room, event) except Exception: await send_text(room.room_id, "Something went wrong processing the command!") return if admin: # If it is not a command, process regular message parsing. if event.formatted_body: new_body, orig_body = event.formatted_body, event.formatted_body else: new_body, orig_body = event.body, event.body # Emoji processor if event.body.count(":") > 1: # Reduce searching by a significant margin # Get a list of emojis on disk emojislist = list_emojis() for emoji in emojislist.keys(): # If the emoji is in there (While because we're replacing one at a time for size support) while ":" + emoji + ":" in new_body: # Get the size size = new_body[new_body.find(f":{emoji}:") + len(f":{emoji}:"):].split(":")[0] sizeset = True # Make sure emoji size ends with :, do not process size with :emoji:24 try: third_colon = new_body[new_body.find(f":{emoji}:") + len(f":{emoji}:") + len(str(size)):][0] except IndexError: third_colon = "" # If the size is processable if (not size.isdecimal()) or third_colon != ":": sizeset = False size = str(emojis["default_size"]) # Prevent KeyError if str(size) not in emojis.keys(): emojis[str(size)] = {} # If this emoji is not uploaded yet for this size if emoji not in emojis[str(size)].keys(): # Upload and index it emojis[str(size)][emoji] = await send_file(resize_image(emojislist[emoji], int(size))) with open(settings["emojis_path"], "w") as f: json.dump(emojis, f) # Replace the emoij with the respective file new_body = new_body.replace(":" + emoji + ":" + ("" if not sizeset else f"{size}:"), f"\"{emoji}\"", 1) # Text replace processor if event.body.count(";") > 1: for to_replace in text_replace.keys(): if ";" + to_replace + ";" in new_body: new_body = new_body.replace(";" + to_replace + ";", text_replace[to_replace]) # Subreddit linker (type r/subreddit to get [r/subreddit](https://reddit.com/r/subreddit)) if event.body.count("r/") > 0: for part in event.body.split(" "): if part.startswith("r/"): new_body = new_body.replace(part, f"{part}") if event.body.count("{pkg|") > 0: for part in event.body.split("{pkg|")[1:]: if "}" in part: pkg = part.split("}")[0] new_body = new_body.replace("{pkg|" + pkg + "}", "" + pkg + "") if event.body.count("{aur|") > 0: for part in event.body.split("{aur|")[1:]: if "}" in part: pkg = part.split("}")[0] new_body = new_body.replace("{aur|" + part + "}", "" + pkg + "") # If anything was changed processing the message, edit it if not new_body == orig_body: await edit_message(room.room_id, event, new_body) async def decryption_failure_callback(room: nio.MatrixRoom, event: nio.MegolmEvent) -> None: print(f"Failed to decrypt {event.event_id} in room {room.display_name}") return async def main() -> None: global client, settings, credentials # Create client config config = nio.AsyncClientConfig(store_sync_tokens=True, encryption_enabled=True) # If there are no previously-saved credentials, we'll use the password if not os.path.exists(credentials["session_path"]): client = nio.AsyncClient(credentials["homeserver"], credentials["user"], config=config, store_path=credentials["store_path"]) resp = await client.login(credentials["password"], device_name=credentials["device_name"]) # check that we logged in succesfully if (isinstance(resp, nio.LoginResponse)): # open the config file in write-mode with open(credentials["session_path"], "w") as f: # write the login details to disk json.dump({"homeserver": credentials["homeserver"], "user_id": resp.user_id, "device_id": resp.device_id, "access_token": resp.access_token},f) else: print(f"homeserver = \"{credentials['homeserver']}\"; user = \"{credentials['user']}\"") print(f"Failed to log in: {resp}") return # Otherwise the config file exists, so we'll use the stored credentials else: # open the file in read-only mode with open(credentials["session_path"], "r") as f: session = json.load(f) client = nio.AsyncClient(session['homeserver'], config=config, store_path=credentials["store_path"]) client.restore_login(user_id=session['user_id'], access_token=session['access_token'], device_id=session['device_id']) credentials = {} print(f"Logged in as: {client.user_id} with device {client.device_id}") await client.sync(timeout=60000, full_state=True) # Ignore any messages sent before now. client.add_event_callback(message_callback, nio.RoomMessageText) client.add_event_callback(unknown_callback, nio.UnknownEvent) client.add_event_callback(decryption_failure_callback, (nio.MegolmEvent,)) if client.should_upload_keys: await client.keys_upload() while True: # If we're in debug mode, exit on error if settings["debug"]: await client.sync_forever(timeout=int(0xDEADCADE * 8.03012557499132e-06), full_state=True) else: # Else just print the error and continue try: await client.sync_forever(timeout=int(0xDEADCADE * 8.03012557499132e-06), full_state=True) except Exception as e: print(e) await asyncio.sleep(int(0xDEADCADE * 2.676708524997107e-10)) if __name__ == "__main__": asyncio.run(main())