Matrix-Selfbot/main.py

581 lines
28 KiB
Python

#!/usr/bin/env python3
# Created by @deadcade:deadca.de
# DEADCADE's Matrix Self Bot v1.2.0
# builtins
import asyncio # Required by nio
import json # Settings and xkcd command
import hashlib # File cache
import os # Filesystem
import mimetypes # Uploading files to homeserver
import html # HTML formatting
import requests # XKCD command
import difflib # XKCD Title lookup
import subprocess # GIF Emojis, ping command (admin)
import random # roll command, http cat without args
import datetime # Ratelimiting, room member cache invalidation
# External dependencies
import nio # Matrix library, matirx-nio[e2e]
from PIL import Image # Resizing images, pillow
try:
from libretranslatepy import LibreTranslateAPI # Translation functionality
translate_available = True
except Exception:
translate_available = False
# Set the settings path
settings_path = "./data/settings.json"
# Open and read settings or exit
if os.path.isfile(settings_path):
with open(settings_path) as settings_file:
settings = json.load(settings_file)
else:
print("Please create a settings file at " + settings_path)
exit()
# Basic function for loading data from disk or returning a default value
def load_data(path, default={}):
if os.path.isfile(path):
try:
with open(path, "r") as f:
return json.load(f)
except json.decoder.JSONDecodeError:
return default
else:
return default
# Cache for files sent to the homeserver, prevents reuploading existing files
file_cache = load_data(settings["file_cache_path"])
# Emojis
emojis = load_data(settings["emojis_path"], {"default_size": 24})
# Get text to replace when enclosed in semicolons
text_replace = load_data(settings["text_replace_path"])
# Credentials
credentials = load_data(settings["credentials_path"])
# Set up global variables
client = None
ratelimits = {}
room_member_cache = {}
# Common function definitions
# Filter a title for local lookup with XKCD
def filter_xkcd_title(title):
filtered_title = ""
for char in title.lower().split("(")[0]:
if (char.isdecimal() or char.isalpha()) and (not char == " "):
filtered_title += char
return filtered_title
# Takes in a country (US, CA, AU), outputs the "flag emoji"/regional indicator emojis
def flag_emoji(country):
regional_indicator_offset = 127397
return chr(ord(country.upper()[0]) + regional_indicator_offset) + chr(ord(country.upper()[1]) + regional_indicator_offset)
# Takes in a flag emoji, outputs as text instead of regional indicators
def flag_decode(emoji):
regional_indicator_offset = 127397
try:
return chr(ord(emoji[0]) - regional_indicator_offset) + chr(ord(emoji[1]) - regional_indicator_offset)
except Exception:
return None
# Grab a list of all emojis on disk
def list_emojis():
global settings
emojislist = {}
for filename in os.listdir(settings["emojis_folder_path"]):
if "_resized" in filename:
continue
orig = filename
filename = settings["emojis_folder_path"] + filename
emojislist[orig.split(".")[0]] = filename
return emojislist
# Downlaod an external file to disk
def download_file(url, filename):
if os.path.exists(filename):
return filename
r = requests.get(url, stream=True, allow_redirects=True)
if r.status_code == 200:
with open(filename, 'wb') as f:
f.write(r.content)
return filename
else:
return False
# GIF resizing
def thumbnails(frames, size):
for frame in frames:
thumbnail = frame.copy()
thumbnail.thumbnail((size, size), Image.ANTIALIAS)
yield thumbnail
# Resize an image (squares only)
def resize_image(filename, size):
original_filename = filename
filename = filename[:-4] + "_resized" + filename[-4:]
if filename.endswith(".gif"):
# PIL does not properly hanle GIF images
subprocess.run(["gifsicle", "--resize", f"{str(size)}x{str(size)}", original_filename, "-o", filename], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
im = Image.open(original_filename)
im = im.resize((size,size))
im.save(filename)
return filename
# Mentions a user
def mention(user):
return f"<a href=\"https://matrix.to/#/{user}\">{user.split(':')[0][1:]}</a>"
# Send file to homeserver
async def send_file(filename):
global client, settings, file_cache
with open(filename, "rb") as f:
hash = hashlib.sha512(f.read()).hexdigest()
if hash in file_cache.keys():
return file_cache[hash]
mime_type = mimetypes.guess_type(filename)
file_stat = os.stat(filename)
with open(filename, "r+b") as f:
resp, maybe_keys = await client.upload(f, content_type=mime_type[0], filename=os.path.basename(filename), filesize=file_stat.st_size)
if (isinstance(resp, nio.UploadResponse)):
file_cache[hash] = resp.content_uri
with open(settings["file_cache_path"], "w") as f:
json.dump(file_cache, f)
return resp.content_uri
else:
return ""
# Send an image as a message to a room
async def send_image(room_id, url, text):
global client
return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.image", "body": text, "url": url}, ignore_unverified_devices=True)
# Edits a message (without HTML formatting)
async def edit_message_unformatted(room_id, original_event, text):
global client
return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": "* " + text, "m.new_content": {"msgtype": "m.text", "body": text}, "m.relates_to": {'rel_type': 'm.replace', 'event_id': original_event.event_id}}, ignore_unverified_devices=True)
# Edits a message (with HTML formatting)
async def edit_message(room_id, original_event, text):
global client
text = text.split("</mx-reply>")[-1]
unformatted, formatted = text, text
unformatted = "".join([part.split(">")[-1] for part in unformatted.split("<")])
unformatted = html.unescape(unformatted)
# Before replacing newlines in formatted, check if both are equal
if unformatted == formatted:
# This check is here to prevent sending "formatted" text that is exactly the same as unformatted text
return await edit_message_unformatted(room_id, original_event, text)
# \n doesn't work in HTML, replace it with <br>
formatted = formatted.replace("\n", "<br>")
return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": "* " + unformatted, "format": "org.matrix.custom.html", "formatted_body": "* " + formatted, "m.new_content": {"msgtype": "m.text", "body": unformatted, "format": "org.matrix.custom.html", "formatted_body": formatted}, "m.relates_to": {'rel_type': 'm.replace', 'event_id': original_event.event_id}}, ignore_unverified_devices=True)
# Send a message (with HTML formatting)
async def send_text(room_id, text):
unformatted, formatted = text, text
unformatted = "".join([part.split(">")[-1] for part in unformatted.split("<")])
unformatted = html.unescape(unformatted)
# \n doesn't work in HTML, replace it with <br>
formatted = formatted.replace("\n", "<br>")
return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": unformatted + " (SelfBot)", "format": "org.matrix.custom.html", "formatted_body": formatted + (f" (<a href=\"{settings['source_url']}\">SelfBot</a>)" if settings["source_url"] else " (SelfBot)")}, ignore_unverified_devices=True)
# Send a reply (with HTML formatting)
async def send_reply(room_id, original_event, text):
unformatted, formatted = text, text
unformatted = "".join([part.split(">")[-1] for part in unformatted.split("<")])
unformatted = html.unescape(unformatted)
# \n doesn't work in HTML, replace it with <br>
formatted = formatted.replace("\n", "<br>")
originalbody = original_event.formatted_body if original_event.formatted_body else original_event.body
formatted = f"<mx-reply><blockquote><a href=\"https://matrix.to/#/{room_id}/{original_event.event_id}\">In reply to</a> <a href=\"https://matrix.to/#/{original_event.sender}\">{original_event.sender}</a><br/>{originalbody}</blockquote></mx-reply>{formatted}"
quotedbody = original_event.body.replace('\n', '\n> ')
unformatted = f"> <{original_event.sender}> {quotedbody}\n\n{unformatted}"
return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": unformatted + " (SelfBot)", "format": "org.matrix.custom.html", "formatted_body": formatted + (f" (<a href=\"{settings['source_url']}\">SelfBot</a>)" if settings["source_url"] else " (SelfBot)"), "m.relates_to": {"m.in_reply_to": {"event_id": original_event.event_id}}}, ignore_unverified_devices=True)
# Commands definition
# Appends shrug to the end of the message
async def shrug(args, room, event):
if event.body == settings["prefix"] + "shrug":
new_message = "¯\\_(ツ)_/¯"
else:
new_message = event.body.replace(settings["prefix"] + "shrug ", "").replace(settings["prefix"] + "shrug", "") + " ¯\\_(ツ)_/¯"
return await edit_message(room.room_id, event, new_message)
# Help command
async def help(args, room, event):
if len(args) == 0:
# No command specified, send command list
source_text = f"\n<a href=\"{settings['source_url']}\">Source Code</a>" if settings["source_url"] else ""
helptext = "Admin Commands:\n"
for command in settings["admin_command_list"]:
helptext += settings["help_messages"][command] + "\n"
helptext += "\nUser Commands:\n"
for command in settings["command_list"]:
helptext += settings["help_messages"][command] + "\n"
helptext += "\n\nArguments: (optional) [required] {admin only, optional}"
return await send_text(room.room_id, helptext.replace("{prefix}", settings["prefix"]) + source_text)
else:
help_command = args[0].lower().split(settings["prefix"])[-1]
if help_command in settings["help_messages"].keys():
return await send_text(room.room_id, "Usage:\n" + settings["help_messages"][help_command].replace("{prefix}", settings["prefix"]) + "\n")
else:
return await send_text(room.room_id, "Unknown command!")
# Set default emoji size
async def emoji_size(args, room, event):
global emojis
if len(args) == 1 and args[0].isdecimal():
emojis["default_size"] = args[0]
with open(settings["emojis_path"], "w") as f:
json.dump(emojis, f)
return await send_text(room.room_id, "Set default size to: " + args[0])
return await send_text(room.room_id, "Please provide the size argument")
# Lists possible emojis
async def emoji_list(args, room, event):
global emojis
emojislist = list_emojis()
message = f"Default emoji size: {emojis['default_size']}\nAvailable emojis:\n"
for emoji in emojislist:
message += emoji + ": :" + emoji + ":24:, "
message += "\n"
message = message[:-1] # Remove the last comma
return await send_text(room.room_id, message)
# Ping the selfbot or an external host
async def ping(args, room, event):
# If no external host is provided, or the user pinging is not the account itself
if len(args) == 0 or event.sender != client.user_id:
return await send_text(room.room_id, "Selfbot is online and accepting messages in " + room.display_name + ".")
else:
# If the user is allowed to ping, and a host is provided, ping
# Linux only, replace -c with -n (and possibly find -W alternative)
ping = subprocess.run(["ping", "-c", "1", "-W", "2", args[0]], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
successful = ping.returncode
message = "Ping to " + args[0] + " was " + ("successful" if not successful else "unsuccessful")
return await send_text(room.room_id, message)
# Roll a dice!
async def roll(args, room, event):
sides = 6 if len(args) == 0 or not args[0].isdecimal() else int(args[0])
out = random.randint(1, sides)
return await send_text(room.room_id, "You rolled a " + str(out) + "!")
# Send an XKCD in chat (Date, number, title, and image)
async def xkcd(args, room, event):
global settings
comic = ""
if len(args) == 1 and args[0].isdecimal():
comic = args[0] + "/"
elif len(args) > 0:
lookup = {}
r = requests.get("https://xkcd.com/archive/")
for line in r.text.split("\n"):
if "<a href=\"" in line and "\" title=\"2" in line:
num = line.split("/")[1]
title = filter_xkcd_title(line.split(">")[1].split("<")[0])
lookup[title] = num
user_title = filter_xkcd_title(" ".join(args))
title = difflib.get_close_matches(user_title, lookup.keys(), n=1)
if len(title) > 0:
comic = lookup[title[0]] + "/"
else:
return await send_text(room.room_id, "Could not find XKCD!")
r = requests.get(f"https://xkcd.com/{comic}info.0.json")
if settings["debug"]:
rj = json.loads(r.text)
else:
try:
rj = json.loads(r.text)
except Exception:
return await send_text(room.room_id, "Failed to get XKCD!")
filename = download_file(rj["img"], settings["cache_path"] + "/" + str(rj["num"]) + "." + rj["img"].split(".")[-1])
image = await send_file(filename)
await send_text(room.room_id, f"{rj['year']}/{rj['month']}/{rj['day']}, {str(rj['num'])}: <a href=\"https://xkcd.com/{str(rj['num'])}/\">{rj['safe_title']}</a>")
return await send_image(room.room_id, image, rj['alt'])
async def httpcat(args, room, event):
global settings
if len(args) == 1 and args[0].isdecimal():
code = args[0]
elif len(args) == 0:
code = random.choice([100, 101, 102, 200, 201, 202, 203, 204, 206, 207, 300, 301, 302, 303, 304, 305, 307, 308, 400, 401, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 420, 421, 422, 423, 424, 425, 426, 429, 431, 444, 450, 451, 498, 499, 500, 501, 502, 503, 504, 506, 507, 508, 509, 510, 511, 521, 522, 523, 525, 599])
r = requests.get(f"https://http.cat/{str(code)}.jpg")
filename = download_file(f"https://http.cat/{str(code)}.jpg", settings["cache_path"] + "/httpcat-" + str(code) + ".jpg")
if filename:
image = await send_file(filename)
return await send_image(room.room_id, image, str(code))
else:
return await send_text(room.room_id, "Failed to get HTTP Cat!")
async def aw(args, room, event):
if len(args) > 0:
search = "+".join(args)
r = requests.get("https://wiki.archlinux.org/index.php?search=" + search)
if r.status_code == 200:
return await send_text(room.room_id, r.url)
else:
return await send_text(room.room_id, "https://wiki.archlinux.org/index.php?search=" + search)
else:
return await send_text(room.room_id, "Please provide a search term.")
async def mutual_rooms(args, room, event):
global client, room_member_cache
# Custom arg parser, because we need to use formatted_body for mentions from modern clients
if len(args) > 0:
args = event.formatted_body.split(" ")[1:] if event.formatted_body else event.body.split(" ")[1:]
user = ""
for arg in args:
if arg.startswith("href"):
# Parse user from a matrix.to link
try:
user = "@" + arg.replace("%40", "@").replace("%3A", ":").split("@")[1].split('"')[0]
except Exception:
pass
break
elif arg.startswith("@"):
# User is direct text
user = arg
break
if not user:
return await send_text(room.room_id, "Error parsing user MXID")
# Create a list of mutual rooms
# Determine if the cache is usable
usecache = "date" in room_member_cache and room_member_cache["date"] > int(datetime.datetime.utcnow().timestamp())
mutual = []
# Get list of rooms the bot user is in
joined_rooms = await client.joined_rooms()
# If cache is invalid
if not usecache:
# Reset the cache timestamp
room_member_cache["date"] = int(datetime.datetime.utcnow().timestamp()) + 1800
# Check every room
for lroom in joined_rooms.rooms:
members = []
# If there's a valid cache entry
if usecache and lroom in room_member_cache:
# Use cache
members = room_member_cache[lroom]
else:
# Grab room member list from HS and store it in cache
members = await client.joined_members(lroom)
members = members.members
room_member_cache[lroom] = members
# Check if user is in room
if user in [x.user_id for x in members]:
mutual.append(lroom)
# Send room list
return await send_text(room.room_id, "List of mutual rooms with " + mention(user) + ":\n" + "\n".join([f"<a href=\"https://matrix.to/#/{x.replace(':', '%3A')}\">{x}</a>" for x in mutual]) + "\n")
async def unknown_callback(room: nio.MatrixRoom, event: nio.UnknownEvent) -> None:
if event.type == "m.reaction":
# Get the ID of the event this was a reaction to
relation_dict = event.source.get("content", {}).get("m.relates_to", {})
reacted_to = relation_dict.get("event_id")
if reacted_to and relation_dict.get("rel_type") == "m.annotation":
await reaction_callback(room, event, reacted_to)
return
async def reaction_callback(room: nio.MatrixRoom, event: nio.UnknownEvent, reacted_to_id: str) -> None:
global client, settings, translate_available
if settings["debug"]:
print(f"Got reaction in {room.room_id} from {event.sender}.")
# No selfbot in mautrix-discord rooms.
if " (Discord)" in room.display_name:
return
# Only acknowledge reactions that we sent
if event.sender != client.user_id:
return
# Get the original event that was reacted to
event_response = await client.room_get_event(room.room_id, reacted_to_id)
if isinstance(event_response, nio.RoomGetEventError):
return
reacted_to_event = event_response.event
# Get the reaction
reaction_content = (event.source.get("content", {}).get("m.relates_to", {}).get("key"))
if translate_available and len(reaction_content) == 2:
if flag_decode(reaction_content):
country = flag_decode(reaction_content).lower()
if country == "us" or country == "gb":
# Translate from auto to English
from_lang = "auto"
else:
# Translate from lang to English
from_lang = country
lt = LibreTranslateAPI(settings["libretranslate_api"], settings["libretranslate_api_key"] if "libretranslate_api_key" in settings.keys() and settings["libretranslate_api_key"] else None)
# If the language is not supported
try:
if from_lang not in [x["code"] for x in lt.languages()] and from_lang != "auto":
return await send_text(room.room_id, "Language " + from_lang + " is not supported!")
except Exception:
return await send_text(room.room_id, "Something went wrong connecting to the LibreTranslate server!")
original_body = reacted_to_event.body
# If the message was a reply
if reacted_to_event.source.get("content", {}).get("m.relates_to", {}).get("m.in_reply_to", {}).get("event_id"):
original_body = "\n\n".join(reacted_to_event.body.split("\n\n")[1:]) # Remove the in reply to part
try:
translated = lt.translate(original_body, from_lang, "en")
except Exception:
return await send_text(room.room_id, "Something went wrong translating the message!")
lang = lt.detect(original_body)[0]["language"].upper() if from_lang == "auto" else from_lang.upper()
return await send_reply(room.room_id, reacted_to_event, "Translated from " + lang + " (" + flag_emoji("US" if lang == "EN" else lang) + "):\n" + translated + "\n")
async def message_callback(room: nio.MatrixRoom, event: nio.RoomMessageText) -> None:
global client, settings, emojis
if settings["debug"]:
print(f"Message received in room {room.display_name}\n{event.sender} | {event.body}")
# Discord messages need different treatment. Absolutely no commands, and if possible no instant message edits.
# Because selfbots are against ToS on Discord, and will get banned within just a few hours to days of usage.
if " (Discord)" in room.display_name:
return
admin = False
if event.sender == client.user_id: # If this is our message
admin = True
if "m.new_content" in event.source["content"]:
return
# If the message is a command, process it as such
if event.body.lower().startswith(settings["prefix"]):
command = event.body.split(" ")[0][len(settings["prefix"]):].lower()
args = event.body.split(" ")[1:]
if event.sender in ratelimits and not admin:
if ratelimits[event.sender] > int(datetime.datetime.utcnow().timestamp()):
return
ratelimits[event.sender] = int(datetime.datetime.utcnow().timestamp()) + settings["ratelimit"]
if command in settings["command_list"] or (admin and command in settings["admin_command_list"]):
command_function = globals()[command]
if settings["debug"]:
await command_function(args, room, event)
else:
try:
await command_function(args, room, event)
except Exception:
await send_text(room.room_id, "Something went wrong processing the command!")
return
if admin:
# If it is not a command, process regular message parsing.
if event.formatted_body:
new_body, orig_body = event.formatted_body, event.formatted_body
else:
new_body, orig_body = event.body, event.body
# Emoji processor
if event.body.count(":") > 1: # Reduce searching by a significant margin
# Get a list of emojis on disk
emojislist = list_emojis()
for emoji in emojislist.keys():
# If the emoji is in there (While because we're replacing one at a time for size support)
while ":" + emoji + ":" in new_body:
# Get the size
size = new_body[new_body.find(f":{emoji}:") + len(f":{emoji}:"):].split(":")[0]
sizeset = True
# Make sure emoji size ends with :, do not process size with :emoji:24
try:
third_colon = new_body[new_body.find(f":{emoji}:") + len(f":{emoji}:") + len(str(size)):][0]
except IndexError:
third_colon = ""
# If the size is processable
if (not size.isdecimal()) or third_colon != ":":
sizeset = False
size = str(emojis["default_size"])
# Prevent KeyError
if str(size) not in emojis.keys():
emojis[str(size)] = {}
# If this emoji is not uploaded yet for this size
if emoji not in emojis[str(size)].keys():
# Upload and index it
emojis[str(size)][emoji] = await send_file(resize_image(emojislist[emoji], int(size)))
with open(settings["emojis_path"], "w") as f:
json.dump(emojis, f)
# Replace the emoij with the respective file
new_body = new_body.replace(":" + emoji + ":" + ("" if not sizeset else f"{size}:"), f"<img height=\"{size}\" src=\"{emojis[size][emoji]}\" alt=\"{emoji}\">", 1)
# Text replace processor
if event.body.count(";") > 1:
for to_replace in text_replace.keys():
if ";" + to_replace + ";" in new_body:
new_body = new_body.replace(";" + to_replace + ";", text_replace[to_replace])
# Subreddit linker (type r/subreddit to get [r/subreddit](https://reddit.com/r/subreddit))
if event.body.count("r/") > 0:
for part in event.body.split(" "):
if part.startswith("r/"):
new_body = new_body.replace(part, f"<a href=\"https://reddit.com/{part}\">{part}</a>")
if event.body.count("{pkg|") > 0:
for part in event.body.split("{pkg|")[1:]:
if "}" in part:
pkg = part.split("}")[0]
new_body = new_body.replace("{pkg|" + pkg + "}", "<a href=\"https://archlinux.org/packages/?name=" + pkg + "\">" + pkg + "</a>")
if event.body.count("{aur|") > 0:
for part in event.body.split("{aur|")[1:]:
if "}" in part:
pkg = part.split("}")[0]
new_body = new_body.replace("{aur|" + part + "}", "<a href=\"https://aur.archlinux.org/packages?K=" + pkg + "\">" + pkg + "</a>")
# If anything was changed processing the message, edit it
if not new_body == orig_body:
await edit_message(room.room_id, event, new_body)
async def decryption_failure_callback(room: nio.MatrixRoom, event: nio.MegolmEvent) -> None:
print(f"Failed to decrypt {event.event_id} in room {room.display_name}")
return
async def main() -> None:
global client, settings, credentials
# Create client config
config = nio.AsyncClientConfig(store_sync_tokens=True, encryption_enabled=True)
# If there are no previously-saved credentials, we'll use the password
if not os.path.exists(credentials["session_path"]):
client = nio.AsyncClient(credentials["homeserver"], credentials["user"], config=config, store_path=credentials["store_path"])
resp = await client.login(credentials["password"], device_name=credentials["device_name"])
# check that we logged in succesfully
if (isinstance(resp, nio.LoginResponse)):
# open the config file in write-mode
with open(credentials["session_path"], "w") as f:
# write the login details to disk
json.dump({"homeserver": credentials["homeserver"], "user_id": resp.user_id, "device_id": resp.device_id, "access_token": resp.access_token},f)
else:
print(f"homeserver = \"{credentials['homeserver']}\"; user = \"{credentials['user']}\"")
print(f"Failed to log in: {resp}")
return
# Otherwise the config file exists, so we'll use the stored credentials
else:
# open the file in read-only mode
with open(credentials["session_path"], "r") as f:
session = json.load(f)
client = nio.AsyncClient(session['homeserver'], config=config, store_path=credentials["store_path"])
client.restore_login(user_id=session['user_id'], access_token=session['access_token'], device_id=session['device_id'])
credentials = {}
print(f"Logged in as: {client.user_id} with device {client.device_id}")
await client.sync(timeout=60000, full_state=True) # Ignore any messages sent before now.
client.add_event_callback(message_callback, nio.RoomMessageText)
client.add_event_callback(unknown_callback, nio.UnknownEvent)
client.add_event_callback(decryption_failure_callback, (nio.MegolmEvent,))
if client.should_upload_keys:
await client.keys_upload()
while True:
# If we're in debug mode, exit on error
if settings["debug"]:
await client.sync_forever(timeout=int(0xDEADCADE * 8.03012557499132e-06), full_state=True)
else:
# Else just print the error and continue
try:
await client.sync_forever(timeout=int(0xDEADCADE * 8.03012557499132e-06), full_state=True)
except Exception as e:
print(e)
await asyncio.sleep(int(0xDEADCADE * 2.676708524997107e-10))
if __name__ == "__main__":
asyncio.run(main())