Compare commits

...

7 commits

3 changed files with 105 additions and 0 deletions

View file

@ -8,6 +8,8 @@
"cache_path": "./data/cache",
"prefix": "!",
"ratelimit": 10,
"libretranslate_api": "https://translate.argosopentech.com/",
"libretranslate_api_key": "",
"admin_command_list": [
"shrug",
"emoji_size",

102
main.py
View file

@ -17,6 +17,11 @@ import datetime # Ratelimiting
# External dependencies
import nio # Matrix library, matirx-nio[e2e]
from PIL import Image # Resizing images, pillow
try:
from libretranslatepy import LibreTranslateAPI # Translation functionality
translate_available = True
except Exception:
translate_available = False
# Set the settings path
settings_path = "./data/settings.json"
@ -64,6 +69,19 @@ def filter_xkcd_title(title):
filtered_title += char
return filtered_title
# Takes in a country (US, CA, AU), outputs the "flag emoji"/regional indicator emojis
def flag_emoji(country):
regional_indicator_offset = 127397
return chr(ord(country.upper()[0]) + regional_indicator_offset) + chr(ord(country.upper()[1]) + regional_indicator_offset)
# Takes in a flag emoji, outputs as text instead of regional indicators
def flag_decode(emoji):
regional_indicator_offset = 127397
try:
return chr(ord(emoji[0]) - regional_indicator_offset) + chr(ord(emoji[1]) - regional_indicator_offset)
except Exception:
return None
# Grab a list of all emojis on disk
def list_emojis():
global settings
@ -166,6 +184,15 @@ async def send_text(room_id, text):
formatted = formatted.replace("\n", "<br>")
return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": unformatted + " (SelfBot)", "format": "org.matrix.custom.html", "formatted_body": formatted + (f" (<a href=\"{settings['source_url']}\">SelfBot</a>)" if settings["source_url"] else " (SelfBot)")}, ignore_unverified_devices=True)
# Send a reply (with HTML formatting)
async def send_reply(room_id, original_event, text):
unformatted, formatted = text, text
unformatted = "".join([part.split(">")[-1] for part in unformatted.split("<")])
unformatted = html.unescape(unformatted)
# \n doesn't work in HTML, replace it with <br>
formatted = formatted.replace("\n", "<br>")
return await client.room_send(room_id=room_id, message_type="m.room.message", content={"msgtype": "m.text", "body": unformatted + " (SelfBot)", "format": "org.matrix.custom.html", "formatted_body": formatted + (f" (<a href=\"{settings['source_url']}\">SelfBot</a>)" if settings["source_url"] else " (SelfBot)"), "m.relates_to": {"m.in_reply_to": {"event_id": original_event.event_id}}}, ignore_unverified_devices=True)
# Commands definition
# Appends shrug to the end of the message
async def shrug(args, room, event):
@ -293,10 +320,68 @@ async def aw(args, room, event):
else:
return await send_text(room.room_id, "Please provide a search term.")
async def unknown_callback(room: nio.MatrixRoom, event: nio.UnknownEvent) -> None:
if event.type == "m.reaction":
# Get the ID of the event this was a reaction to
relation_dict = event.source.get("content", {}).get("m.relates_to", {})
reacted_to = relation_dict.get("event_id")
if reacted_to and relation_dict.get("rel_type") == "m.annotation":
await reaction_callback(room, event, reacted_to)
return
async def reaction_callback(room: nio.MatrixRoom, event: nio.UnknownEvent, reacted_to_id: str) -> None:
global client, settings, translate_available
if settings["debug"]:
print(f"Got reaction in {room.room_id} from {event.sender}.")
# No selfbot in mautrix-discord rooms.
if " (Discord)" in room.display_name:
return
# Only acknowledge reactions that we sent
if event.sender != client.user_id:
return
# Get the original event that was reacted to
event_response = await client.room_get_event(room.room_id, reacted_to_id)
if isinstance(event_response, nio.RoomGetEventError):
return
reacted_to_event = event_response.event
# Get the reaction
reaction_content = (event.source.get("content", {}).get("m.relates_to", {}).get("key"))
if translate_available and len(reaction_content) == 2:
if flag_decode(reaction_content):
country = flag_decode(reaction_content).lower()
if country == "us" or country == "gb":
# Translate from auto to English
from_lang = "auto"
else:
# Translate from lang to English
from_lang = country
lt = LibreTranslateAPI(settings["libretranslate_api"], settings["libretranslate_api_key"] if "libretranslate_api_key" in settings.keys() and settings["libretranslate_api_key"] else None)
# If the language is not supported
if from_lang not in [x["code"] for x in lt.languages()] and from_lang != "auto":
return await send_text(room.room_id, "Language " + from_lang + " is not supported!")
original_body = reacted_to_event.body
# If the message was a reply
if reacted_to_event.source.get("content", {}).get("m.relates_to", {}).get("m.in_reply_to", {}).get("event_id"):
original_body = "\n\n".join(reacted_to_event.split("\n\n")[1:]) # Remove the in reply to part
translated = lt.translate(original_body, from_lang, "en")
lang = lt.detect(reacted_to_event.body)[0]["language"].upper() if from_lang == "auto" else from_lang.upper()
return await send_reply(room.room_id, reacted_to_event, "Translated from " + lang + " (" + flag_emoji("US" if lang == "EN" else lang) + "):\n" + translated + "\n")
async def message_callback(room: nio.MatrixRoom, event: nio.RoomMessageText) -> None:
global client, settings, emojis
if settings["debug"]:
print(f"Message received in room {room.display_name}\n{event.sender} | {event.body}")
# Discord messages need different treatment. Absolutely no commands, and if possible no instant message edits.
# Because selfbots are against ToS on Discord, and will get banned within just a few hours to days of usage.
if " (Discord)" in room.display_name:
return
admin = False
if event.sender == client.user_id: # If this is our message
admin = True
@ -361,6 +446,22 @@ async def message_callback(room: nio.MatrixRoom, event: nio.RoomMessageText) ->
for to_replace in text_replace.keys():
if ";" + to_replace + ";" in new_body:
new_body = new_body.replace(";" + to_replace + ";", text_replace[to_replace])
# Subreddit linker (type r/subreddit to get [r/subreddit](https://reddit.com/r/subreddit))
if event.body.count("r/") > 0:
for part in event.body.split(" "):
if part.startswith("r/"):
new_body = new_body.replace(part, f"<a href=\"https://reddit.com/{part}\">{part}</a>")
if event.body.count("{pkg|") > 0:
for part in event.body.split(" "):
if part.startswith("{pkg|") and part.endswith("}"):
pkg = part.split('|')[1].rstrip('}')
new_body = new_body.replace(part, "<a href=\"https://archlinux.org/packages/?name=" + pkg + "\">" + pkg + "</a>")
if event.body.count("{aur|") > 0:
for part in event.body.split(" "):
if part.startswith("{aur|") and part.endswith("}"):
pkg = part.split('|')[1].rstrip('}')
new_body = new_body.replace(part, "<a href=\"https://aur.archlinux.org/packages?K=" + pkg + "\">" + pkg + "</a>")
# If anything was changed processing the message, edit it
if not new_body == orig_body:
await edit_message(room.room_id, event, new_body)
@ -399,6 +500,7 @@ async def main() -> None:
print(f"Logged in as: {client.user_id} with device {client.device_id}")
await client.sync(timeout=60000, full_state=True) # Ignore any messages sent before now.
client.add_event_callback(message_callback, nio.RoomMessageText)
client.add_event_callback(unknown_callback, nio.UnknownEvent)
client.add_event_callback(decryption_failure_callback, (nio.MegolmEvent,))
if client.should_upload_keys:
await client.keys_upload()

View file

@ -1,2 +1,3 @@
matrix-nio[e2e]
pillow
libretranslatepy