#!/usr/bin/env python3 """ The core bot class for Fulvia. """ import os import sys import time import threading import traceback from datetime import datetime from twisted.internet import protocol, reactor from twisted.words.protocols import irc import db import tools import config import loader import scheduler from trigger import Trigger class Fulvia(irc.IRCClient): def __init__(self): self.nickname = config.nickname self.nick = config.nickname """The bot's current nickname.""" self.realname = config.realname """The bot's 'real name', used in whois.""" self.username = config.username """The bot's username ident used for logging into the server.""" self.host = "" """The bot's host, virtual or otherwise. To be filled in later.""" self.static = "static" os.makedirs(self.static, exist_ok=True) """The path to the bot's static file directory.""" self.log_path = "logs" os.makedirs(self.log_path, exist_ok=True) """The path to the bot's log files.""" self._log_dump = tools.FulviaMemoryDefault(list) """ Internal thread-safe dictionary containing log dumps yet to be written. """ self._last_log_dump = 0 """The last time logs were dumped.""" self.channels = tools.FulviaMemory() """ A dictionary of all channels the bot is currently in and the users in them. """ self.memory = tools.FulviaMemory() """ A thread-safe general purpose dictionary to be used by whatever modules need it. """ self.db = db.FulviaDB() """ A class with some basic interactions for the bot's sqlite3 databse. """ self._hooks = [] """ A list containing all function names to be hooked with every message received. """ self.commands = {} """ A dictionary containing all commands to look for and the name of the function they call. """ self._times = {} """ A dictionary full of times when certain functions were called for use with the @rate decorator. Keys are function names. """ self.url_callbacks = {} """ A dictionary of URL callbacks to allow other modules interact with the URL module. """ self._user_joined = [] """These get called when a user joins a channel.""" self._disabled_modules = config.disabled_modules """These modules will NOT be loaded when load_modules() is called.""" self.scheduler = scheduler.Scheduler(self) """The bot's task scheduler.""" self.load_modules() def load_modules(self): """ Find and load all of our modules. """ print(f"Loading modules...") self._hooks = [] self.commands = {} self._times = {} self.url_callbacks = {} # ensure they're empty modules = loader.find_modules() loaded = 0 failed = 0 for name, path in modules.items(): try: if name in self._disabled_modules: continue loader.load_module(self, path) loaded += 1 except Exception as e: print(f"{name} failed to load.") print(e) traceback.print_exc() failed += 1 continue print(f"Registered {loaded} modules,") print(f"{failed} modules failed to load") def register_callable(self, func): """ Registers all callable functions loaded from modules into one convenient table. """ if hasattr(func, 'commands'): for cmd in func.commands: self.commands[cmd] = func if func.hook: self._hooks.append(func) if func.rate or func.channel_rate or func.global_rate: self._times[func.__name__] = {} if hasattr(func, 'url_callback'): for url in func.url_callback: self.url_callbacks[url] = func if func.user_joined: self._user_joined.append(func) def unregister_callable(self, func): """ Removes the provided function object from the internal list of module functions. """ if not callable(func) or not loader.is_triggerable(func): return if hasattr(func, 'commands'): for command in func.commands: self.commands.pop(command) if func.hook: self._hooks.remove(func) if func.rate or func.channel_rate or func.global_rate: self._times.pop(func) if hasattr(func, 'url_callback'): for url in func.url_callback: self.url_callbacks.pop(url) if func.user_joined: self._user_joined.remove(func) def add_shutdown(self, func): """Adds the given function to the reactor's shutdown sequence.""" reactor.addSystemEventTrigger('before','shutdown', func) def stillConnected(self): """Returns true if the bot is still connected to the server.""" if self._heartbeat: return True else: return False def call(self, func, bot, trigger): """ A wrapper for calling module functions so that we can have nicer error handling. """ try: func(bot, trigger) except Exception as e: msg = type(e).__name__ + ": " + str(e) self.msg(trigger.channel, msg) traceback.print_exc() def log(self, channel, text): """ Logs text to file. Logging structure is {self.log_path}/{self.server}/{channel}/{date}.log Only permits log dumping once per second. """ # TODO: use time module instead of datetime t = datetime.fromtimestamp(time.time()) timestamp = t.strftime(config.default_time_format) self._log_dump[channel].append(timestamp + " " + text) if time.time() - self._last_log_dump > 1: for ch in self._log_dump.keys(): fname = t.strftime("%Y-%m-%d") + ".log" path = os.path.join(self.log_path, self.hostname, ch) os.makedirs(path, exist_ok=True) with open(os.path.join(path, fname), "a+") as file: file.write("\n".join(self._log_dump[ch]) + "\n") del(self._log_dump) self._log_dump = tools.FulviaMemoryDefault(list) ## Actions involving the bot directly. ## These will get called automatically. def privmsg(self, user, channel, message): """ Called when the bot receives a PRIVMSG, which can come from channels or users alike. """ nick = user.partition("!")[0].partition("@")[0] if channel.startswith("#"): opSym = self.channels[channel].users[nick].op_level else: opSym = '' channel = nick line = f"<{opSym}{nick}> {message}" self.log(channel, line) funcs = [] if message.startswith(config.prefix) and message != config.prefix: command = message.partition(" ")[0] command = command.replace(config.prefix, "", 1) cmd = self.commands.get(command) if not cmd: return funcs.append(cmd) funcs += self._hooks for func in funcs: trigger = Trigger(user, channel, message) bot = FulviaWrapper(self, trigger) if func.rate: t = self._times[func.__name__].get(trigger.nick, 0) if time.time() - t < func.rate and not trigger.admin: return self._times[func.__name__][trigger.nick] = time.time() if func.channel_rate: t = self._times[func.__name__].get(trigger.channel, 0) if time.time() - t < func.channel_rate and not trigger.admin: return self._times[func.__name__][trigger.channel] = time.time() if func.global_rate: t = self._times[func.__name__].get("global", 0) if time.time() - t < func.channel_rate and not trigger.admin: return self._times[func.__name__]["global"] = time.time() if func.thread == True: t = threading.Thread(target=self.call,args=(func, bot, trigger)) t.start() else: self.call(func, bot, trigger) def joined(self, channel): """Called when the bot joins a new channel.""" line = f"-!- {self.nickname} [{self.username}@{self.host}] " line += f"has joined {channel}" self.log(channel, line) print(f"Joined {channel}") self.channels[channel] = tools.Channel(channel) user = tools.User(self.nickname) self.channels[channel].users[self.nickname] = user def left(self, channel, reason=""): """Called when the bot leaves a channel.""" line = f"-!- {self.nickname} [{self.username}@{self.host}] " line += f"has left {channel} [{reason}]" self.log(channel, line) print(f"Parted {channel}") self.channels.pop(channel) def noticed(self, user, channel, message): """Called when the bot receives a NOTICE from a user or channel.""" # TODO: something? pass def modeChanged(self, user, channel, add_mode, modes, args): """Called when users or channel's modes are changed.""" line = f"-!- mode/{channel} [" if add_mode: line += "+" else: line += "-" line += modes if any(args): line += " " + " ".join(args) line += "] by " + user.partition("!")[0].partition("@")[0] self.log(channel, line) if not channel.startswith("#"): # server level return for n, mode in enumerate(modes): if args[n] == None: # channel mode if add_mode: self.channels[channel].modes.add(mode) else: self.channels[channel].modes.remove(mode) elif mode in tools.op_level.keys(): # user mode, op_level mode nick = args[n] user = self.channels[channel].users[nick] if add_mode: user.op_level += mode else: user.op_level.replace(mode, '', 1) else: # user mode, non-op_level mode continue def signedOn(self): """Called when the bot successfully connects to the server.""" if config.oper_password: self.sendLine("OPER " + config.nickname + ' ' + config.oper_password) print(f"Signed on as {self.nickname}") self.whois(self.nickname) line = f"*** Signed onto {self.hostname} as " line += f"{self.nickname}!{self.username}@{self.host}" self.log(self.hostname, line) for channel in config.channels: self.join(channel) def kickedFrom(self, channel, kicker, message): """Called when the bot is kicked from a channel.""" line = f"-!- {self.nickname} was kicked from {channel} " line += f"by {kicker} [{message}]" self.log(channel, line) self.channels.pop(channel) def nickChanged(self, nick): """Called when the bot changes it's nickname.""" line = f"-!- you are now known as {nick}" for channel_name, channel in self.channels.items(): self.log(channel_name, line) user = channel.users.pop(self.nickname) user.nick = nick channel.users[nick] = user ## Actions the bot observes other users doing in the channel. ## These will get called automatically. def userJoined(self, user, channel): """Called when the bot sees another user join a channel.""" nick, _, host = user.partition("!") line = f"-!- {nick} [{host}] has joined {channel}" self.log(channel, line) new_user = tools.User(nick) self.channels[channel].users[nick] = new_user for func in self._user_joined: trigger = Trigger(user, channel, f"{user} has joined") bot = FulviaWrapper(self, trigger) t = threading.Thread(target=self.call, args=(func, bot, trigger)) t.start() def userLeft(self, user, channel, reason=""): """Called when the bot sees another user leave a channel.""" nick, _, host = user.partition("!") line = f"-!- {nick} [{host}] has left {channel} [{reason}]" self.log(channel, line) self.channels[channel].users.pop(nick) def userQuit(self, user, quitMessage): """Called when the bot sees another user disconnect from the network.""" nick, _, host = user.partition("!") line = f"-!- {nick} [{host}] has quit [{quitMessage}]" for channel_name, channel in self.channels.items(): if not nick in channel.users: continue self.log(channel_name, line) channel.users.pop(nick) def userKicked(self, kickee, channel, kicker, message): """ Called when the bot sees another user getting kicked from the channel. """ line =f"-!- {kickee} was kicked from {channel} by {kicker} [{message}]" self.log(channel, line) self.channels[channel].users.pop(kickee) def topicUpdated(self, user, channel, newTopic): """Called when the bot sees a user update the channel's topic.""" line = f"-!- {user} changed the topic of {channel} to: {newTopic}" self.log(channel, line) self.channels[channel].topic = newTopic def userRenamed(self, oldname, newname): """Called when the bot sees a user change their nickname.""" line = "-!- {oldname} is now known as {newname}" for channel_name, channel in self.channels.items(): if oldname not in channel.users.keys(): continue self.log(channel_name, line) user = channel.users.pop(oldname) user.nick = newname channel.users[newname] = user def namesReceived(self, channel, channel_type, nicklist): """ Called when the bot receives a list of names in the channel from the server. """ self.channels[channel].channel_type = channel_type for nick in nicklist: op_level = '' if nick[0] in tools.op_level.keys(): op_level = nick[0] nick = nick[1:] user = tools.User(nick) user.op_level = op_level self.channels[channel].users[nick] = user def whoisUser(self, nick, ident, host, realname): """ Called when the bot receives a WHOIS about a particular user. """ if nick == self.nickname: self.username = ident self.host = host self.realname = realname else: for channel_name, channel in self.channels.items(): if nick in channel: user = channel[nick] user.ident = ident user.host = host user.realname = realname def whoisIdle(self, nick, idle, signon): """ Called when the bot receives a WHOIS about a particular user. nick - nickname of the user idle - seconds since last activity from user signon - unix timestamp when user signed on """ if self.memory.get("idle_callbacks"): self.memory["idle_callbacks"][nick].callback((nick, idle, signon)) self.memory["idle_callbacks"].pop(nick) ## User commands, from client->server def msg(self, user, message, length=None): """ Sends a message 'message' to 'user' (can be a user or a channel). If 'length' is specified, the message will be split into lengths of that size. If 'length' is not specified, the bot will determine an appropriate length automatically. """ if user.startswith("#"): opSym = self.channels[user].users[self.nickname].op_level else: opSym = '' line = f"<{opSym}{self.nickname}> {message}" self.log(user, line) super().msg(user, message, length) def reply(self, text, dest, reply_to, notice=False): """ Sends a message to 'dest' prefixed with 'reply_to' and a colon, ala "reply_to: text". More useful with the wrapper class. """ text = reply_to + ": " + text self.msg(dest, text) def quit(self, message=""): """ Disconnects from quitting normally results in the factory reconnecting us, so we need to include shutting down the factory. """ print("Quit from server") irc.IRCClient.quit(self, message) self.factory.stopTrying() sys.exit(0) # why doesn't this work? ## Raw server->client messages def irc_RPL_NAMREPLY(self, prefix, params): """ Called when we have received a list of names in the channel from the server. """ channel_types = {"@": "secret", "*": "private", "=": "public"} channel_type = channel_types[params[1]] channel = params[2] nicklist = params[3].split() self.namesReceived(channel, channel_type, nicklist) def irc_JOIN(self, prefix, params): """ Called when a user joins a channel. """ nick = prefix.split('!')[0] channel = params[-1] if nick == self.nickname: self.joined(channel) else: self.userJoined(prefix, channel) def irc_PART(self, prefix, params): """ Called when a user leaves a channel. """ nick = prefix.split('!')[0] channel = params[0] if len(params) > 1: reason = params[1] else: reason = "" if nick == self.nickname: self.left(channel, reason) else: self.userLeft(prefix, channel, reason) def irc_QUIT(self, prefix, params): """ Called when a user has quit. """ nick = prefix.split('!')[0] self.userQuit(prefix, params[0]) def irc_RPL_WHOISUSER(self, prefix, params): """ Called when we receive the server's response from our "WHOIS [user]" query. Contains hostmask information. """ _, nick, ident, host, _, realname = params self.whoisUser(nick, ident, host, realname) def irc_RPL_WHOISIDLE(self, prefix, params): """ Called when we receive the server's response from our "WHOIS [user]" query. Contains idle and signon time information. """ _, nick, idle, signon, _ = params self.whoisIdle(nick, idle, signon) def lineReceived(self, line): """Overridden to fix broken unicode issues.""" if bytes != str and isinstance(line, bytes): line = line.decode('utf-8', errors='surrogateescape') super().lineReceived(line) class FulviaWrapper(): """ A wrapper class for Fulvia to provide default destinations for msg methods and so forth. """ def __init__(self, fulvia, trigger): object.__setattr__(self, '_bot', fulvia) object.__setattr__(self, '_trigger', trigger) # directly setting these values would cause a recursion loop # overflow def __getattr__(self, attr): """Redirects getting attributes to the master bot instance.""" return getattr(self._bot, attr) def __setattr__(self, attr, value): """Redirects setting attributes to the master bot instance.""" return setattr(self._bot, attr, value) def msg(self, user=None, message=None, length=None): """ If user is None it will default to the channel the message was received on. """ if user == None: raise TypeError("msg() missing 2 required positional " \ + "arguments: 'user' and 'message'") if message == None: # assume the first argument is the message message = user user = self._trigger.channel self._bot.msg(user, message, length) def reply(self, message, destination=None, reply_to=None, notice=False): """ If destination is None, it defaults to the channel the message was received on. If reply_to is None, it defaults to the person who sent the message. """ if destination is None: destination = self._trigger.channel if reply_to is None: reply_to = self._trigger.nick self._bot.reply(message, destination, reply_to, notice) class FulviaFactory(protocol.ReconnectingClientFactory): protocol = Fulvia