#!/usr/bin/env python3 """ The core bot class for Fulvia. """ import os import sys import time import functools import threading import traceback from twisted.internet import protocol from twisted.words.protocols import irc import db import tools import loader from trigger import Trigger class Fulvia(irc.IRCClient): def __init__(self, config): self.config = config """The bot's config, loaded from file.""" self.nickname = config.core.nickname self.nick = config.core.nickname """The bot's current nickname.""" self.realname = config.core.realname """The bot's 'real name', used in whois.""" self.username = config.core.username """The bot's username ident used for logging into the server.""" self.prefix = config.core.prefix """The command prefix the bot watches for.""" self.static = os.path.join(config.homedir, "static") """The path to the bot's static file directory.""" self.channels = tools.FulviaMemory() """ A dictionary of all channels the bot is currently in and the users in them. """ self.users = tools.FulviaMemory() """A dictionary of all users the bot is aware of.""" self.memory = tools.FulviaMemory() """ A thread-safe general purpose dictionary to be used by whatever modules need it. """ self.db = db.FulviaDB(self.config) """ A class with some basic interactions for the bot's sqlite3 databse. """ self._callables = {} """ A dictionary containing all callable functions loaded from modules. Keys are the functions name. """ self._hooks = [] """ A list containing all function names to be hooked with every message received. """ self._commands = {} """ A dictionary containing all commands to look for and the name of the function they call. """ self.cmd_priv = {} """ A dictionary with command names as keys and required privilege levels as values. """ self.doc = {} """ A dictionary of command names to their docstring and example, if declared. """ self._times = {} """ A dictionary full of times when certain functions were called for use with the @rate decorator. Keys are function names. """ self.url_callbacks = {} """ A dictionary of URL callbacks to allow other modules interact with the URL module. """ self.load_modules() def load_modules(self): """ Find and load all of our modules. """ print(f"Loading modules...") self._callables, self._rules, self._commands, self.doc = {}, {}, {}, {} # ensure they're empty modules = loader.find_modules(self.config.homedir) loaded = 0 failed = 0 for name, path in modules.items(): try: loader.load_module(self, path) loaded += 1 except Exception as e: print(f"{name} failed to load.") print(e) traceback.print_exc() failed += 1 continue print(f"Registered {loaded} modules,") print(f"{failed} modules failed to load") def register_callable(self, callables): """ Registers all callable functions loaded from modules into one convenient table. """ for func in callables: if hasattr(func, 'commands'): self._callables[func.__name__] = func for command in func.commands: self._commands[command] = func.__name__ self.cmd_priv[command] = func.priv if func.hook: self._callables[func.__name__] = func self._hooks.append(func.__name__) if func.rate or func.channel_rate or func.global_rate: self._times[func.__name__] = {} if hasattr(func, 'url_callback'): for url in func.url_callback: self.url_callbacks[url] = func for command, docs in func._docs.items(): self.doc[command] = docs def unregister_callable(self, func): """ Removes the provided function object from the internal list of module functions. """ if not callable(func) or not loader.is_triggerable(func): return if hasattr(func, 'commands'): self._callables.pop(func.__name__) for command in func.commands: self._commands.pop(command) self.cmd_priv.pop(command) if func.hook: self._callables.pop(func.__name__) self._hooks.remove(func.__name__) if func.rate or func.channel_rate or func.global_rate: self._times.pop(func.__name__) if hasattr(func, 'url_callback'): for url in func.url_callback: self.url_callbacks.pop(url) for command, docs in func._docs.items(): self.doc.pop(command) def stillConnected(self): """Returns true if the bot is still connected to the server.""" if self._heartbeat: return True else: return False def call(self, func, bot, trigger): """ A wrapper for calling module functions so that we can have nicer error handling. """ try: func(bot, trigger) except Exception as e: msg = type(e).__name__ + ": " + str(e) self.msg(trigger.channel, msg) traceback.print_exc() ## Actions involving the bot directly. ## These will get called automatically. def privmsg(self, user, channel, message): """ Called when the bot receives a PRIVMSG, which can come from channels or users alike. """ func_names = [] if message.startswith(self.prefix): command, _, _ = message.partition(" ") command = command.replace(self.prefix, "", 1) func_name = self._commands.get(command) if not func_name: return func_names.append(func_name) func_names += self._hooks for func_name in func_names: func = self._callables[func_name] trigger = Trigger(user, channel, message, "PRIVMSG", self.config) bot = FulviaWrapper(self, trigger) if func.rate: t = self._times[func_name].get(trigger.nick, 0) if time.time() - t < func.rate and not trigger.admin: return self._times[func_name][trigger.nick] = time.time() if func.channel_rate: t = self._times[func_name].get(trigger.channel, 0) if time.time() - t < func.channel_rate and not trigger.admin: return self._times[func_name][trigger.channel] = time.time() if func.global_rate: t = self._times[func_name].get("global", 0) if time.time() - t < func.channel_rate and not trigger.admin: return self._times[func_name]["global"] = time.time() if func.thread == True: t = threading.Thread(target=self.call, args=(func, bot, trigger)) t.start() else: self.call(func, bot, trigger) def joined(self, channel): """Called when the bot joins a new channel.""" print(f"Joined {channel}") if channel not in self.channels: self.channels[channel] = tools.Channel(channel) def left(self, channel): """Called when the leaves a channel.""" print(f"Parted {channel}") self.channels.pop(channel) def modeChanged(self, user, channel, add_mode, modes, args): """Called when users or channel's modes are changed.""" # TODO: do this function the right way # TODO: channel modes # modes[0] is lazy, but ought to work in most cases if modes[0] in tools.op_level.keys(): for n, name in enumerate(args): if add_mode: mode = tools.op_level[modes[n]] self.channels[channel].privileges[name] = mode else: # this is extremely lazy and won't work in a lot of cases self.channels[channel].privileges[name] = 0 def signedOn(self): """Called when the bot successfully connects to the server.""" print(f"Signed on as {self.nickname}") for channel in self.config.core.channels.split(","): self.join(channel) def kickedFrom(self, channel, kicker, message): """Called when the bot is kicked from a channel.""" self.channels.pop(channel) ## Actions the bot observes other users doing in the channel. ## These will get called automatically. def userJoined(self, user, channel): """Called when the bot sees another user join a channel.""" if user not in self.users: self.users[user] = tools.User(user) self.channels[channel].add_user(self.users[user]) def userLeft(self, user, channel): """Called when the bot sees another user join a channel.""" self.channels[channel].remove_user(user) def userQuit(self, user, quitMessage): """Called when the bot sees another user disconnect from the network.""" channels = list(self.users[user].channels.keys()) for channel in channels: # self.users[user].channels[channel].remove_user # channel.remove_user(user) self.channels[channel].remove_user(user) self.users.pop(user) def userKicked(self, kickee, channel, kicker, message): """ Called when the bot sees another user getting kicked from the channel. """ self.channels[channel].remove_user(kickee) def topicUpdated(self, user, channel, newTopic): """Called when the bot sees a user update the channel's topic.""" self.channels[channel].topic = newTopic def userRenamed(self, oldname, newname): """Called wehn the bot sees a user change their nickname.""" user = self.users.pop(oldname) self.users[newname] = user for key, channel in user.channels.items(): channel.rename_user(oldname, newname) user.nick = newname def namesReceived(self, channel, channel_type, nicklist): """ Called when the bot receives a list of names in the channel from the server. """ self.channels[channel].channel_type = channel_type for nick in nicklist: op_level = tools.op_level.get(nick[0], 0) if op_level > 0: nick = nick[1:] self.users[nick] = tools.User(nick) self.channels[channel].add_user(self.users[nick]) self.channels[channel].privileges[nick] = op_level ## User commands, from client->server def msg(self, user, message, length=None): """ Send a message to a user or channel. See irc.IRCClient.msg for more information. Provided solely for documentation and clarity's sake. """ irc.IRCClient.msg(self, user, message, length=None) def say(self, text, recipient, max_messages=None): """ For compatibility with most of the sopel modules. Will phase it out in favor of self.msg eventually. """ # TODO: change everything to use bot.msg() self.msg(recipient, text) def reply(self, text, dest, reply_to, notice=False): """ For compatibility with most of the sopel modules. Will phase it out in favor of self.msg eventually. """ text = reply_to + ": " + text self.msg(dest, text) def quit(self, message=""): """ Disconnects from quitting normally results in the factory reconnecting us, so we need to include shutting down the factory. """ print("Quit from server") irc.IRCClient.quit(self, message) self.factory.stopTrying() sys.exit(0) # why doesn't this work? ## Raw server->client messages def irc_RPL_NAMREPLY(self, prefix, params): """ Called when we have received a list of names in the channel from the server. """ channel_types = {"@": "secret", "*": "private", "=": "public"} channel_type = channel_types[params[1]] channel = params[2] nicklist = params[3].split() self.namesReceived(channel, channel_type, nicklist) class FulviaWrapper(): def __init__(self, fulvia, trigger): """ A wrapper class for Fulvia to provide default destinations for msg methods and so forth. """ object.__setattr__(self, '_bot', fulvia) object.__setattr__(self, '_trigger', trigger) # directly setting these values would cause a recursion loop # overflow def __getattr__(self, attr): """Redirects getting attributes to the master bot instance.""" return getattr(self._bot, attr) def __setattr__(self, attr, value): """Redirects setting attributes to the master bot instance.""" return setattr(self._bot, attr, value) def msg(self, user=None, message=None, length=None): """ If user is None it will default to the channel the message was received on. """ if user == None: raise TypeError("msg() missing 2 required positional " \ + "arguments: 'user' and 'message'") if message == None: # assume the first argument is the message message = user user = self._trigger.channel self._bot.msg(user, message, length) def say(self, message, destination=None, max_messages=1): """ If destination is None, it defaults to the channel the message was received on. """ if destination is None: destination = self._trigger.channel self._bot.say(message, destination, max_messages) def reply(self, message, destination=None, reply_to=None, notice=False): """ If destination is None, it defaults to the channel the message was received on. If reply_to is None, it defaults to the person who sent the message. """ if destination is None: destination = self._trigger.channel if reply_to is None: reply_to = self._trigger.nick self._bot.reply(message, destination, reply_to, notice) class FulviaFactory(protocol.ReconnectingClientFactory): # black magic going on here protocol = property(lambda s: functools.partial(Fulvia, s.config)) def __init__(self, config): self.config = config