fulvia/bot.py

658 lines
18 KiB
Python
Executable File

#!/usr/bin/env python3
"""
The core bot class for Fulvia.
"""
import os
import sys
import time
import functools
import threading
import traceback
from datetime import datetime
from twisted.internet import protocol
from twisted.words.protocols import irc
import db
import tools
import loader
from trigger import Trigger
class Fulvia(irc.IRCClient):
def __init__(self, config):
self.config = config
"""The bot's config, loaded from file."""
self.nickname = config.core.nickname
self.nick = config.core.nickname
"""The bot's current nickname."""
self.realname = config.core.realname
"""The bot's 'real name', used in whois."""
self.username = config.core.username
"""The bot's username ident used for logging into the server."""
self.host = ""
"""The bot's host, virtual or otherwise. To be filled in later."""
self.prefix = config.core.prefix
"""The command prefix the bot watches for."""
self.static = os.path.join(config.homedir, "static")
os.makedirs(self.static, exist_ok=True)
"""The path to the bot's static file directory."""
self.log_path = os.path.join(config.homedir, "logs")
os.makedirs(self.static, exist_ok=True)
"""The path to the bot's log files."""
self._log_dump = tools.FulviaMemoryDefault(list)
"""
Internal thread-safe dictionary containing log dumps yet to be
written.
"""
self._last_log_dump = 0
"""The last time logs were dumped."""
self.channels = tools.FulviaMemory()
"""
A dictionary of all channels the bot is currently in and the users
in them.
"""
self.users = tools.FulviaMemory()
"""A dictionary of all users the bot is aware of."""
self.memory = tools.FulviaMemory()
"""
A thread-safe general purpose dictionary to be used by whatever
modules need it.
"""
self.db = db.FulviaDB(self.config)
"""
A class with some basic interactions for the bot's sqlite3 databse.
"""
self._callables = {}
"""
A dictionary containing all callable functions loaded from
modules. Keys are the functions name.
"""
self._hooks = []
"""
A list containing all function names to be hooked with every message
received.
"""
self.commands = {}
"""
A dictionary containing all commands to look for and the name
of the function they call.
"""
self._times = {}
"""
A dictionary full of times when certain functions were called for
use with the @rate decorator. Keys are function names.
"""
self.url_callbacks = {}
"""
A dictionary of URL callbacks to allow other modules interact with
the URL module.
"""
self._disabled_modules = self.config.core.disabled_modules.split(",")
"""These modules will NOT be loaded when load_modules() is called."""
self.load_modules()
def load_modules(self):
"""
Find and load all of our modules.
"""
print(f"Loading modules...")
self._callables = {}
self._hooks = []
self.commands = {}
self._times = {}
self.url_callbacks = {}
# ensure they're empty
modules = loader.find_modules(self.config.homedir)
loaded = 0
failed = 0
for name, path in modules.items():
try:
if name in self._disabled_modules:
continue
loader.load_module(self, path)
loaded += 1
except Exception as e:
print(f"{name} failed to load.")
print(e)
traceback.print_exc()
failed += 1
continue
print(f"Registered {loaded} modules,")
print(f"{failed} modules failed to load")
def register_callable(self, func):
"""
Registers all callable functions loaded from modules into one
convenient table.
"""
if hasattr(func, 'commands'):
self._callables[func.__name__] = func
for cmd in func.commands:
self.commands[cmd] = tools.Command(cmd)
self.commands[cmd]._func_name = func.__name__
self.commands[cmd].priv = func.priv
self.commands[cmd].doc = func._docs
if cmd in func.aliases:
self.commands[cmd].canonical = False
aliases = [a for a in func.commands if a != cmd]
self.commands[cmd].aliases = aliases
if func.hook:
self._callables[func.__name__] = func
self._hooks.append(func.__name__)
if func.rate or func.channel_rate or func.global_rate:
self._times[func.__name__] = {}
if hasattr(func, 'url_callback'):
for url in func.url_callback:
self.url_callbacks[url] = func
def unregister_callable(self, func):
"""
Removes the provided function object from the internal list of
module functions.
"""
if not callable(func) or not loader.is_triggerable(func):
return
if hasattr(func, 'commands'):
self._callables.pop(func.__name__)
for command in func.commands:
self.commands.pop(command)
if func.hook:
self._callables.pop(func.__name__)
self._hooks.remove(func.__name__)
if func.rate or func.channel_rate or func.global_rate:
self._times.pop(func.__name__)
if hasattr(func, 'url_callback'):
for url in func.url_callback:
self.url_callbacks.pop(url)
def stillConnected(self):
"""Returns true if the bot is still connected to the server."""
if self._heartbeat:
return True
else:
return False
def call(self, func, bot, trigger):
"""
A wrapper for calling module functions so that we can have nicer
error handling.
"""
try:
func(bot, trigger)
except Exception as e:
msg = type(e).__name__ + ": " + str(e)
self.msg(trigger.channel, msg)
traceback.print_exc()
def log(self, channel, text):
"""
Logs text to file. Logging structure is
{self.log_path}/{self.server}/{channel}/{date}.log
Only permits log dumping once per second.
"""
# TODO: use time module instead of datetime
t = datetime.fromtimestamp(time.time())
timestamp = t.strftime(self.config.core.default_time_format)
self._log_dump[channel].append(timestamp + " " + text)
if time.time() - self._last_log_dump > 1:
for ch in self._log_dump.keys():
fname = t.strftime("%Y-%m-%d") + ".log"
path = os.path.join(self.log_path, self.hostname, ch)
os.makedirs(path, exist_ok=True)
with open(os.path.join(path, fname), "a+") as file:
file.write("\n".join(self._log_dump[ch]) + "\n")
del(self._log_dump)
self._log_dump = tools.FulviaMemoryDefault(list)
## Actions involving the bot directly.
## These will get called automatically.
def privmsg(self, user, channel, message):
"""
Called when the bot receives a PRIVMSG, which can come from channels
or users alike.
"""
nick = user.partition("!")[0].partition("@")[0]
if channel.startswith("#"):
opSym = tools.getOpSym(self.channels[channel].privileges[nick])
else:
opSym = ""
channel = nick
line = "<" + opSym + nick + ">" + " " + message
self.log(channel, line)
func_names = []
if message.startswith(self.prefix) and message != self.prefix:
command, _, _ = message.partition(" ")
command = command.replace(self.prefix, "", 1)
cmd = self.commands.get(command)
if not cmd:
return
func_names.append(cmd._func_name)
func_names += self._hooks
for func_name in func_names:
func = self._callables[func_name]
trigger = Trigger(user, channel, message, "PRIVMSG", self.config)
bot = FulviaWrapper(self, trigger)
if func.rate:
t = self._times[func_name].get(trigger.nick, 0)
if time.time() - t < func.rate and not trigger.admin:
return
self._times[func_name][trigger.nick] = time.time()
if func.channel_rate:
t = self._times[func_name].get(trigger.channel, 0)
if time.time() - t < func.channel_rate and not trigger.admin:
return
self._times[func_name][trigger.channel] = time.time()
if func.global_rate:
t = self._times[func_name].get("global", 0)
if time.time() - t < func.channel_rate and not trigger.admin:
return
self._times[func_name]["global"] = time.time()
if func.thread == True:
t = threading.Thread(target=self.call,args=(func, bot, trigger))
t.start()
else:
self.call(func, bot, trigger)
def joined(self, channel):
"""Called when the bot joins a new channel."""
line = "-!- " + self.nickname + " " + "[" + self.username + "@"
line += self.host + "] has joined " + channel
self.log(channel, line)
print(f"Joined {channel}")
if channel not in self.channels:
self.channels[channel] = tools.Channel(channel)
def left(self, channel, reason=""):
"""Called when the leaves a channel."""
line = "-!- " + self.nickname + " " + "[" + self.username + "@"
line += self.host + "] has left " + channel + " [" + reason + "]"
self.log(channel, line)
print(f"Parted {channel}")
self.channels.pop(channel)
def noticed(self, user, channel, message):
"""Called when the bot receives a NOTICE from a user or channel."""
# TODO: something?
pass
def modeChanged(self, user, channel, add_mode, modes, args):
"""Called when users or channel's modes are changed."""
line = "-!- mode/" + channel + " ["
if add_mode:
line += "+"
else:
line += "-"
line += modes
if any(args):
line += " " + " ".join(args)
line += "] by " + user.partition("!")[0].partition("@")[0]
self.log(channel, line)
if not channel.startswith("#"): # server level
return
for n, mode in enumerate(modes):
if args[n] == None: # channel mode
if add_mode:
self.channels[channel].modes.add(mode)
else:
self.channels[channel].modes.remove(mode)
elif mode in tools.op_level.keys(): # user mode, op_level mode
nick = args[n]
op_level = tools.op_level[mode]
if add_mode:
self.channels[channel].privileges[nick] += op_level
else:
self.channels[channel].privileges[nick] -= op_level
else: # user mode, non-op_level mode
continue
def signedOn(self):
"""Called when the bot successfully connects to the server."""
print(f"Signed on as {self.nickname}")
self.whois(self.nickname)
line = "*** Signed onto " + self.hostname + " as "
line += self.nickname + "!" + self.username + "@" + self.host
self.log(self.hostname, line)
for channel in self.config.core.channels.split(","):
self.join(channel)
def kickedFrom(self, channel, kicker, message):
"""Called when the bot is kicked from a channel."""
line = "-!- " + self.nickname + " was kicked from " + channel
line += " by " + kicker + " [" + message + "]"
self.log(channel, line)
self.channels.pop(channel)
## Actions the bot observes other users doing in the channel.
## These will get called automatically.
def userJoined(self, user, channel):
"""Called when the bot sees another user join a channel."""
nick, _, host = user.partition("!")
line = "-!- " + nick + " " + "[" + host + "] has joined " + channel
self.log(channel, line)
if nick not in self.users:
self.users[nick] = tools.User(nick)
self.channels[channel].add_user(self.users[nick])
def userLeft(self, user, channel, reason=""):
"""Called when the bot sees another user leave a channel."""
nick, _, host = user.partition("!")
line = "-!- " + nick + " " + "[" + host + "] has left "
line += channel + " [" + reason + "]"
self.log(channel, line)
self.channels[channel].remove_user(nick)
def userQuit(self, user, quitMessage):
"""Called when the bot sees another user disconnect from the network."""
nick, _, host = user.partition("!")
line = "-!- " + nick + " [" + host + "] has quit [" + quitMessage + "]"
channels = list(self.users[nick].channels.keys())
for channel in channels:
self.log(channel, line)
self.channels[channel].remove_user(nick)
self.users.pop(nick)
def userKicked(self, kickee, channel, kicker, message):
"""
Called when the bot sees another user getting kicked from the channel.
"""
line = "-!- " + kickee + " was kicked from " + channel
line += " by " + kicker + " [" + message + "]"
self.log(channel, line)
self.channels[channel].remove_user(kickee)
def topicUpdated(self, user, channel, newTopic):
"""Called when the bot sees a user update the channel's topic."""
line = "-!- " + user + " changed the topic of " + channel + " to: "
line += newTopic
self.log(channel, line)
self.channels[channel].topic = newTopic
def userRenamed(self, oldname, newname):
"""Called wehn the bot sees a user change their nickname."""
line = "-!- " + oldname + " is now known as " + newname
user = self.users.pop(oldname)
self.users[newname] = user
for key, channel in user.channels.items():
self.log(key, line)
channel.rename_user(oldname, newname)
user.nick = newname
def namesReceived(self, channel, channel_type, nicklist):
"""
Called when the bot receives a list of names in the channel from the
server.
"""
self.channels[channel].channel_type = channel_type
for nick in nicklist:
op_level = tools.op_level.get(nick[0], 0)
if op_level > 0:
nick = nick[1:]
self.users[nick] = tools.User(nick)
self.channels[channel].add_user(self.users[nick])
self.channels[channel].privileges[nick] = op_level
def whoisUser(self, nick, ident, host, realname):
"""
Called when the bot receives a WHOIS about a particular user.
"""
if nick == self.nickname:
self.username = ident
self.host = host
self.realname = realname
def whoisIdle(self, nick, idle, signon):
"""
Called when the bot receives a WHOIS about a particular user.
nick - nickname of the user
idle - seconds since last activity from user
signon - unix timestamp when user signed on
"""
if self.memory.get("idle_callbacks"):
self.memory["idle_callbacks"][nick].callback((nick, idle, signon))
self.memory["idle_callbacks"].pop(nick)
## User commands, from client->server
def msg(self, user, message, length=None):
"""
Sends a message 'message' to 'user' (can be a user or a channel).
If 'length' is specified, the message will be split into lengths
of that size. If 'length' is not specified, the bot will determine
an appropriate length automatically.
"""
if user.startswith("#"):
priv = self.channels[user].privileges[self.nickname]
opSym = tools.getOpSym(priv)
else:
opSym = ""
line = "<" + opSym + self.nickname + ">" + " " + message
self.log(user, line)
irc.IRCClient.msg(self, user, message, length=None)
def reply(self, text, dest, reply_to, notice=False):
"""
Sends a message to 'dest' prefixed with 'reply_to' and a colon,
ala "reply_to: text". More useful with the wrapper class.
"""
text = reply_to + ": " + text
self.msg(dest, text)
def quit(self, message=""):
"""
Disconnects from quitting normally results in the factory
reconnecting us, so we need to include shutting down the factory.
"""
print("Quit from server")
irc.IRCClient.quit(self, message)
self.factory.stopTrying()
sys.exit(0) # why doesn't this work?
## Raw server->client messages
def irc_RPL_NAMREPLY(self, prefix, params):
"""
Called when we have received a list of names in the channel from the
server.
"""
channel_types = {"@": "secret", "*": "private", "=": "public"}
channel_type = channel_types[params[1]]
channel = params[2]
nicklist = params[3].split()
self.namesReceived(channel, channel_type, nicklist)
def irc_JOIN(self, prefix, params):
"""
Called when a user joins a channel.
"""
nick = prefix.split('!')[0]
channel = params[-1]
if nick == self.nickname:
self.joined(channel)
else:
self.userJoined(prefix, channel)
def irc_PART(self, prefix, params):
"""
Called when a user leaves a channel.
"""
nick = prefix.split('!')[0]
channel = params[0]
if len(params) > 1:
reason = params[1]
else:
reason = ""
if nick == self.nickname:
self.left(channel, reason)
else:
self.userLeft(prefix, channel, reason)
def irc_QUIT(self, prefix, params):
"""
Called when a user has quit.
"""
nick = prefix.split('!')[0]
self.userQuit(prefix, params[0])
def irc_RPL_WHOISUSER(self, prefix, params):
"""
Called when we receive the server's response from our "WHOIS [user]"
query. Contains hostmask information.
"""
_, nick, ident, host, _, realname = params
self.whoisUser(nick, ident, host, realname)
def irc_RPL_WHOISIDLE(self, prefix, params):
"""
Called when we receive the server's response from our "WHOIS [user]"
query. Contains idle and signon time information.
"""
_, nick, idle, signon, _ = params
self.whoisIdle(nick, idle, signon)
class FulviaWrapper():
"""
A wrapper class for Fulvia to provide default destinations for msg
methods and so forth.
"""
def __init__(self, fulvia, trigger):
object.__setattr__(self, '_bot', fulvia)
object.__setattr__(self, '_trigger', trigger)
# directly setting these values would cause a recursion loop
# overflow
def __getattr__(self, attr):
"""Redirects getting attributes to the master bot instance."""
return getattr(self._bot, attr)
def __setattr__(self, attr, value):
"""Redirects setting attributes to the master bot instance."""
return setattr(self._bot, attr, value)
def msg(self, user=None, message=None, length=None):
"""
If user is None it will default to the channel the message was
received on.
"""
if user == None:
raise TypeError("msg() missing 2 required positional " \
+ "arguments: 'user' and 'message'")
if message == None:
# assume the first argument is the message
message = user
user = self._trigger.channel
self._bot.msg(user, message, length)
def reply(self, message, destination=None, reply_to=None, notice=False):
"""
If destination is None, it defaults to the channel the message
was received on.
If reply_to is None, it defaults to the person who sent the
message.
"""
if destination is None:
destination = self._trigger.channel
if reply_to is None:
reply_to = self._trigger.nick
self._bot.reply(message, destination, reply_to, notice)
class FulviaFactory(protocol.ReconnectingClientFactory):
# black magic going on here
protocol = property(lambda s: functools.partial(Fulvia, s.config))
def __init__(self, config):
self.config = config