initial commit

This commit is contained in:
iou1name 2018-03-16 03:13:43 -04:00
commit 67627a0740
59 changed files with 5503 additions and 0 deletions

10
.gitignore vendored Executable file
View File

@ -0,0 +1,10 @@
__pycache__/
*/__pycache__/
logs/
*.cfg
*.db
*.pid
*.dat
*.txt
*.swp
tourettes.py

11
README.md Executable file
View File

@ -0,0 +1,11 @@
NIGGER DICKS 2: Electric Boogaloo
It's like Sopel, except rewritten from scratch using Twisted as a base and over half the features ripped out.
Dependencies: `twisted, python-dateutil, wolfram, requests, bs4, pyenchant`
TODO:
Consider re-adding the following modules: `etymology, ip`
Consider logging
Change `bot.say` to `bot.msg`
Add CTCP responses
More complex versioning

464
bot.py Executable file
View File

@ -0,0 +1,464 @@
#!/usr/bin/env python3
"""
The core bot class for Fulvia.
"""
import os
import sys
import time
import functools
import threading
import traceback
from twisted.internet import protocol
from twisted.words.protocols import irc
import db
import tools
import loader
from trigger import Trigger
class Fulvia(irc.IRCClient):
def __init__(self, config):
self.config = config
"""The bot's config, loaded from file."""
self.nickname = config.core.nickname
self.nick = config.core.nickname
"""The bot's current nickname."""
self.realname = config.core.realname
"""The bot's 'real name', used in whois."""
self.username = config.core.username
"""The bot's username ident used for logging into the server."""
self.prefix = config.core.prefix
"""The command prefix the bot watches for."""
self.static = os.path.join(config.homedir, "static")
"""The path to the bot's static file directory."""
self.channels = tools.FulviaMemory()
"""
A dictionary of all channels the bot is currently in and the users
in them.
"""
self.users = tools.FulviaMemory()
"""A dictionary of all users the bot is aware of."""
self.memory = tools.FulviaMemory()
"""
A thread-safe general purpose dictionary to be used by whatever
modules need it.
"""
self.db = db.FulviaDB(self.config)
"""
A class with some basic interactions for the bot's sqlite3 databse.
"""
self._callables = {}
"""
A dictionary containing all callable functions loaded from
modules. Keys are the functions name.
"""
self._hooks = []
"""
A list containing all function names to be hooked with every message
received.
"""
self._commands = {}
"""
A dictionary containing all all commands to look for and the name
of the function they call.
"""
self.doc = {}
"""
A dictionary of command names to their docstring and example, if
declared.
"""
self._times = {}
"""
A dictionary full of times when certain functions were called for
use with the @rate decorator. Keys are function names.
"""
self.url_callbacks = {}
"""
A dictionary of URL callbacks to allow other modules interact with
the URL module.
"""
self.load_modules()
def load_modules(self):
"""
Find and load all of our modules.
"""
print(f"Loading modules...")
self._callables, self._rules, self._commands, self.doc = {}, {}, {}, {}
# ensure they're empty
modules = loader.find_modules(self.config.homedir)
loaded = 0
failed = 0
for name, path in modules.items():
try:
loader.load_module(self, path)
loaded += 1
except Exception as e:
print(f"{name} failed to load.")
print(e)
traceback.print_exc()
failed += 1
continue
print(f"Registered {loaded} modules,")
print(f"{failed} modules failed to load")
def register_callable(self, callables):
"""
Registers all callable functions loaded from modules into one
convenient table.
"""
for func in callables:
if hasattr(func, 'commands'):
self._callables[func.__name__] = func
for command in func.commands:
self._commands[command] = func.__name__
if func.hook:
self._callables[func.__name__] = func
self._hooks.append(func.__name__)
if func.rate or func.channel_rate or func.global_rate:
self._times[func.__name__] = {}
if hasattr(func, 'url_callback'):
for url in func.url_callback:
self.url_callbacks[url] = func
for command, docs in func._docs.items():
self.doc[command] = docs
def unregister_callable(self, func):
"""
Removes the provided function object from the internal list of
module functions.
"""
if not callable(func) or not loader.is_triggerable(func):
return
if hasattr(func, 'commands'):
self._callables.pop(func.__name__)
for command in func.commands:
self._commands.pop(command)
if func.hook:
self._callables.pop(func.__name__)
self._hooks.remove(func.__name__)
if func.rate or func.channel_rate or func.global_rate:
self._times.pop(func.__name__)
if hasattr(func, 'url_callback'):
for url in func.url_callback:
self.url_callbacks.pop(url)
for command, docs in func._docs.items():
self.doc.pop(command)
def stillConnected(self):
"""Returns true if the bot is still connected to the server."""
if self._heartbeat:
return True
else:
return False
def call(self, func, bot, trigger):
"""
A wrapper for calling module functions so that we can have nicer
error handling.
"""
try:
func(bot, trigger)
except Exception as e:
msg = type(e).__name__ + ": " + str(e)
self.msg(trigger.channel, msg)
traceback.print_exc()
## Actions involving the bot directly.
## These will get called automatically.
def privmsg(self, user, channel, message):
"""
Called when the bot receives a PRIVMSG, which can come from channels
or users alike.
"""
func_names = []
if message.startswith(self.prefix):
command, _, _ = message.partition(" ")
command = command.replace(self.prefix, "", 1)
func_name = self._commands.get(command)
if not func_name:
return
func_names.append(func_name)
func_names += self._hooks
for func_name in func_names:
func = self._callables[func_name]
trigger = Trigger(user, channel, message, "PRIVMSG", self.config)
bot = FulviaWrapper(self, trigger)
if func.rate:
t = self._times[func_name].get(trigger.nick, 0)
if time.time() - t < func.rate and not trigger.admin:
return
self._times[func_name][trigger.nick] = time.time()
if func.channel_rate:
t = self._times[func_name].get(trigger.channel, 0)
if time.time() - t < func.channel_rate and not trigger.admin:
return
self._times[func_name][trigger.channel] = time.time()
if func.global_rate:
t = self._times[func_name].get("global", 0)
if time.time() - t < func.channel_rate and not trigger.admin:
return
self._times[func_name]["global"] = time.time()
if func.thread == True:
t = threading.Thread(target=self.call, args=(func, bot, trigger))
t.start()
else:
self.call(func, bot, trigger)
def joined(self, channel):
"""Called when the bot joins a new channel."""
print(f"Joined {channel}")
if channel not in self.channels:
self.channels[channel] = tools.Channel(channel)
def left(self, channel):
"""Called when the leaves a channel."""
print(f"Parted {channel}")
self.channels.pop(channel)
def modeChanged(self, user, channel, add_mode, modes, args):
"""Called when users or channel's modes are changed."""
# TODO: do this function the right way
# TODO: channel modes
# modes[0] is lazy, but ought to work in most cases
if modes[0] in tools.op_level.keys():
for n, name in enumerate(args):
if add_mode:
mode = tools.op_level[modes[n]]
self.channels[channel].privileges[name] = mode
else:
# this is extremely lazy and won't work in a lot of cases
self.channels[channel].privileges[name] = 0
def signedOn(self):
"""Called when the bot successfully connects to the server."""
print(f"Signed on as {self.nickname}")
for channel in self.config.core.channels.split(","):
self.join(channel)
def kickedFrom(self, channel, kicker, message):
"""Called when the bot is kicked from a channel."""
self.channels.pop(channel)
## Actions the bot observes other users doing in the channel.
## These will get called automatically.
def userJoined(self, user, channel):
"""Called when the bot sees another user join a channel."""
if user not in self.users:
self.users[user] = tools.User(user)
self.channels[channel].add_user(self.users[user])
def userLeft(self, user, channel):
"""Called when the bot sees another user join a channel."""
self.channels[channel].remove_user(user)
def userQuit(self, user, quitMessage):
"""Called when the bot sees another user disconnect from the network."""
channels = list(self.users[user].channels.keys())
for channel in channels:
# self.users[user].channels[channel].remove_user
# channel.remove_user(user)
self.channels[channel].remove_user(user)
self.users.pop(user)
def userKicked(self, kickee, channel, kicker, message):
"""
Called when the bot sees another user getting kicked from the channel.
"""
self.channels[channel].remove_user(kickee)
def topicUpdated(self, user, channel, newTopic):
"""Called when the bot sees a user update the channel's topic."""
self.channels[channel].topic = newTopic
def userRenamed(self, oldname, newname):
"""Called wehn the bot sees a user change their nickname."""
user = self.users.pop(oldname)
self.users[newname] = user
for key, channel in user.channels.items():
channel.rename_user(oldname, newname)
user.nick = newname
def namesReceived(self, channel, channel_type, nicklist):
"""
Called when the bot receives a list of names in the channel from the
server.
"""
self.channels[channel].channel_type = channel_type
for nick in nicklist:
op_level = tools.op_level.get(nick[0], 0)
if op_level > 0:
nick = nick[1:]
self.users[nick] = tools.User(nick)
self.channels[channel].add_user(self.users[nick])
self.channels[channel].privileges[nick] = op_level
## User commands, from client->server
def msg(self, user, message, length=None):
"""
Send a message to a user or channel. See irc.IRCClient.msg for more
information.
Provided solely for documentation and clarity's sake.
"""
irc.IRCClient.msg(self, user, message, length=None)
def say(self, text, recipient, max_messages=None):
"""
For compatibility with most of the sopel modules. Will phase it out
in favor of self.msg eventually.
"""
# TODO: change everything to use bot.msg()
self.msg(recipient, text)
def reply(self, text, dest, reply_to, notice=False):
"""
For compatibility with most of the sopel modules. Will phase it out
in favor of self.msg eventually.
"""
text = reply_to + ": " + text
self.msg(dest, text)
def quit(self, message=""):
"""
Disconnects from quitting normally results in the factory
reconnecting us, so we need to include shutting down the factory.
"""
print("Quit from server")
irc.IRCClient.quit(self, message)
self.factory.stopTrying()
sys.exit(0) # why doesn't this work?
## Raw server->client messages
def irc_RPL_NAMREPLY(self, prefix, params):
"""
Called when we have received a list of names in the channel from the
server.
"""
channel_types = {"@": "secret", "*": "private", "=": "public"}
channel_type = channel_types[params[1]]
channel = params[2].lower()
nicklist = params[3].split()
self.namesReceived(channel, channel_type, nicklist)
class FulviaWrapper():
def __init__(self, fulvia, trigger):
"""
A wrapper class for Fulvia to provide default destinations for
msg methods and so forth.
"""
object.__setattr__(self, '_bot', fulvia)
object.__setattr__(self, '_trigger', trigger)
# directly setting these values would cause a recursion loop
# overflow
def __getattr__(self, attr):
"""Redirects getting attributes to the master bot instance."""
return getattr(self._bot, attr)
def __setattr__(self, attr, value):
"""Redirects setting attributes to the master bot instance."""
return setattr(self._bot, attr, value)
def msg(self, user=None, message=None, length=None):
"""
If user is None it will default to the channel the message was
received on.
"""
if user == None:
raise TypeError("msg() missing 2 required positional " \
+ "arguments: 'user' and 'message'")
if message == None:
# assume the first argument is the message
message = user
user = self._trigger.channel
self._bot.msg(user, message, length)
def say(self, message, destination=None, max_messages=1):
"""
If destination is None, it defaults to the channel the message
was received on.
"""
if destination is None:
destination = self._trigger.channel
self._bot.say(message, destination, max_messages)
def reply(self, message, destination=None, reply_to=None, notice=False):
"""
If destination is None, it defaults to the channel the message
was received on.
If reply_to is None, it defaults to the person who sent the
message.
"""
if destination is None:
destination = self._trigger.channel
if reply_to is None:
reply_to = self._trigger.nick
self._bot.reply(message, destination, reply_to, notice)
class FulviaFactory(protocol.ReconnectingClientFactory):
# black magic going on here
protocol = property(lambda s: functools.partial(Fulvia, s.config))
def __init__(self, config):
self.config = config

116
config.py Executable file
View File

@ -0,0 +1,116 @@
#!/usr/bin/env python3
"""
For parsing and generating config files.
"""
from configparser import ConfigParser
class Config():
def __init__(self, filename):
"""
The bot's configuration.
The given filename will be associated with the configuration, and is
the file which will be written if write() is called. If load is not
given or True, the configuration object will load the attributes from
the file at filename.
A few default values will be set here if they are not defined in the
config file, or a config file is not loaded. They are documented below.
"""
self.filename = filename
"""The config object's associated file, as noted above."""
self.parser = ConfigParser(allow_no_value=True, interpolation=None)
self.parser.read(self.filename)
@property
def homedir(self):
"""An alias to config.core.homedir"""
# Technically it's the other way around, so we can bootstrap filename
# attributes in the core section, but whatever.
configured = None
if self.parser.has_option('core', 'homedir'):
configured = self.parser.get('core', 'homedir')
if configured:
return configured
else:
return os.path.dirname(self.filename)
def save(self):
"""Save all changes to the config file."""
with open(self.filename, 'w') as cfgfile:
self.parser.write(cfgfile)
def add_section(self, name):
"""
Add a section to the config file.
Returns ``False`` if already exists.
"""
try:
return self.parser.add_section(name)
except ConfigParser.DuplicateSectionError:
return False
def __getattr__(self, name):
"""Allows sections to be called like class attributes."""
if name in self.parser.sections():
items = self.parser.items(name)
section = ConfigSection(name, items, self) # Return a section
setattr(self, name, section)
return section
else:
raise AttributeError("%r object has no attribute %r"
% (type(self).__name__, name))
class ConfigSection(object):
"""
Represents a section of the config file.
Contains all keys in thesection as attributes.
"""
def __init__(self, name, items, parent):
object.__setattr__(self, '_name', name)
object.__setattr__(self, '_parent', parent)
for item in items:
value = item[1].strip()
if not value.lower() == 'none':
if value.lower() == 'false':
value = False
object.__setattr__(self, item[0], value)
def __getattr__(self, name):
return None
def __setattr__(self, name, value):
object.__setattr__(self, name, value)
if type(value) is list:
value = ','.join(value)
self._parent.parser.set(self._name, name, value)
def get_list(self, name):
value = getattr(self, name)
if not value:
return []
if isinstance(value, str):
value = value.split(',')
# Keep the split value, so we don't have to keep doing this
setattr(self, name, value)
return value
def readConfig(filename):
"""
Parses the provided filename and returns the config object.
"""
config = ConfigParser(allow_no_value=True, interpolation=None)
config.read(filename)
return config
def generateConfig(filename):
"""
Generates a blank config file with minimal defaults.
"""
pass

30
db.py Executable file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env python3
"""
The bot's database connection class.
"""
import os
import sqlite3
class FulviaDB(object):
"""
Defines a basic interface and some convenience functionsfor the bot's
database.
"""
def __init__(self, config):
path = config.core.db_filename
self.filename = path
def connect(self):
"""Return a raw database connection object."""
return sqlite3.connect(self.filename, timeout=10)
def execute(self, *args, **kwargs):
"""
Execute an arbitrary SQL query against the database.
Returns a cursor object, on which things like `.fetchall()` can be
called per PEP 249.
"""
with self.connect() as conn:
cur = conn.cursor()
return cur.execute(*args, **kwargs)

105
loader.py Executable file
View File

@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
Methods for loading modules.
"""
import os
import sys
import importlib
def load_module(bot, path):
"""
Loads a module from the provided path, cleans it up and registers
it with the bot's internal callable list.
"""
module = importlib.import_module(path)
if hasattr(module, 'setup'):
module.setup(bot)
relevant_parts = process_module(module, bot.config)
bot.register_callable(relevant_parts)
def unload_module(bot, name):
"""
Unloads a module and deletes references to it from the bot's memory.
"""
old_module = sys.modules[name]
# delete references to the module functions from the bot's memory
for obj_name, obj in vars(old_module).items():
bot.unregister_callable(obj)
del old_module
del sys.modules[name]
def find_modules(homedir):
"""
Searches through homedir/modules for python files and returns a dictionary
with the module name as the key and the path as the value.
"""
modules_dir = os.path.join(homedir, "modules")
modules = {}
for file in os.listdir(modules_dir):
if not file.endswith(".py"):
continue
name = file.replace(".py", "")
modules[name] = "modules" + "." + name
return modules
def process_module(module, config):
"""
Takes a module object and extracts relevant data objects out of it.
Returns all callables(read: functions) and shutdowns(?).
"""
callables = []
for key, obj in dict.items(vars(module)):
if callable(obj):
if is_triggerable(obj):
process_callable(obj, config)
callables.append(obj)
return callables
def is_triggerable(obj):
"""
Checks if the given function object is triggerable by Fulvia, eg. has
any of a few particular attributes or declarators defined.
"""
triggerable_attributes = ("commands", "hook", "url_callback")
return any(hasattr(obj, attr) for attr in triggerable_attributes)
def process_callable(func, config):
"""
Sets various helper atributes about a given function.
"""
prefix = config.core.prefix
doc = func.__doc__
if doc:
doc = doc.strip()
doc = doc.replace("\t", "")
doc = doc.replace("\n\n", "\x00")
doc = doc.replace("\n", " ")
doc = doc.replace("\x00", "\n")
func._docs = {}
func.example = getattr(func, "example", [(None, None)])
func.thread = getattr(func, 'thread', True)
func.hook = getattr(func, 'hook', False)
func.rate = getattr(func, 'rate', 0)
func.channel_rate = getattr(func, 'channel_rate', 0)
func.global_rate = getattr(func, 'global_rate', 0)
if hasattr(func, 'commands'):
if hasattr(func, 'example'):
for n, example in enumerate(func.example):
ex_input = example[0]
if not ex_input:
continue
if ex_input[0] != prefix:
ex_input = prefix + ex_input
func.example[n] = (ex_input, example[1])
if doc:
for command in func.commands:
func._docs[command] = (doc, func.example)

246
module.py Executable file
View File

@ -0,0 +1,246 @@
#!/usr/bin/env python3
"""
This contains decorators and tools for creating callable plugin functions.
"""
import functools
def hook(value=False):
"""
Decorate a function to be called every time a PRIVMSG is received.
Args:
value: Either True or False. If True the function is called every
time a PRIVMSG is received. If False it is not. Default is False.
PRIVMSGs are ordinary messages sent from either a channel or a user (a
private message). In a busy channel, this function will be called quite
a lot. Please consider this carefully before applying as it can have
significant performance implications.
Note: Do not use this with the commands decorator.
"""
def add_attribute(function):
function.hook = value
return function
return add_attribute
def thread(value=True):
"""
Decorate a function to specify if it should be run in a separate thread.
Functions run in a separate thread (as is the default) will not prevent the
bot from executing other functions at the same time. Functions not run in a
separate thread may be started while other functions are still running, but
additional functions will not start until it is completed.
Args:
value: Either True or False. If True the function is called in
a separate thread. If False from the main thread.
"""
def add_attribute(function):
function.thread = value
return function
return add_attribute
def commands(*command_list):
"""
Decorate a function to set one or more commands to trigger it.
This decorator can be used to add multiple commands to one callable in a
single line. The resulting match object will have the command as the first
group, rest of the line, excluding leading whitespace, as the second group.
Parameters 1 through 4, seperated by whitespace, will be groups 3-6.
Args:
command: A string, which can be a regular expression.
Returns:
A function with a new command appended to the commands
attribute. If there is no commands attribute, it is added.
Example:
@commands("hello"):
If the command prefix is "\.", this would trigger on lines starting
with ".hello".
@commands('j', 'join')
If the command prefix is "\.", this would trigger on lines starting
with either ".j" or ".join".
"""
def add_attribute(function):
if not hasattr(function, "commands"):
function.commands = []
function.commands.extend(command_list)
return function
return add_attribute
def rate(user=0, channel=0, server=0):
"""
Decorate a function to limit how often it can be triggered on a per-user
basis, in a channel, or across the server (bot). A value of zero means no
limit. If a function is given a rate of 20, that function may only be used
once every 20 seconds in the scope corresponding to the parameter.
Users on the admin list in Sopels configuration are exempted from rate
limits.
Rate-limited functions that use scheduled future commands should import
threading.Timer() instead of sched, or rate limiting will not work properly.
"""
def add_attribute(function):
function.rate = user
function.channel_rate = channel
function.global_rate = server
return function
return add_attribute
def require_privmsg(message=None):
"""
Decorate a function to only be triggerable from a private message.
If it is triggered in a channel message, `message` will be said if given.
"""
def actual_decorator(function):
@functools.wraps(function)
def _nop(*args, **kwargs):
# Assign trigger and bot for easy access later
bot, trigger = args[0:2]
if trigger.is_privmsg:
return function(*args, **kwargs)
else:
if message and not callable(message):
bot.say(message)
return _nop
# Hack to allow decorator without parens
if callable(message):
return actual_decorator(message)
return actual_decorator
def require_chanmsg(message=None):
"""
Decorate a function to only be triggerable from a channel message.
If it is triggered in a private message, `message` will be said if given.
"""
def actual_decorator(function):
@functools.wraps(function)
def _nop(*args, **kwargs):
# Assign trigger and bot for easy access later
bot, trigger = args[0:2]
if not trigger.is_privmsg:
return function(*args, **kwargs)
else:
if message and not callable(message):
bot.say(message)
return _nop
# Hack to allow decorator without parens
if callable(message):
return actual_decorator(message)
return actual_decorator
def require_privilege(level, message=None):
"""
Decorate a function to require at least the given channel permission.
`level` can be one of the privilege levels defined in this module. If the
user does not have the privilege, `message` will be said if given. If it is
a private message, no checking will be done.
"""
def actual_decorator(function):
@functools.wraps(function)
def guarded(bot, trigger, *args, **kwargs):
# If this is a privmsg, ignore privilege requirements
if trigger.is_privmsg or trigger.admin:
return function(bot, trigger, *args, **kwargs)
channel_privs = bot.privileges[trigger.channel]
allowed = channel_privs.get(trigger.nick, 0) >= level
if not allowed:
if message and not callable(message):
bot.say(message)
else:
return function(bot, trigger, *args, **kwargs)
return guarded
return actual_decorator
def require_admin(message=None):
"""
Decorate a function to require the triggering user to be a bot admin.
If they are not, `message` will be said if given.
"""
def actual_decorator(function):
@functools.wraps(function)
def guarded(bot, trigger, *args, **kwargs):
if not trigger.admin:
if message and not callable(message):
bot.say(message)
else:
return function(bot, trigger, *args, **kwargs)
return guarded
# Hack to allow decorator without parens
if callable(message):
return actual_decorator(message)
return actual_decorator
def require_owner(message=None):
"""
Decorate a function to require the triggering user to be the bot owner.
If they are not, `message` will be said if given.
"""
def actual_decorator(function):
@functools.wraps(function)
def guarded(bot, trigger, *args, **kwargs):
if not trigger.owner:
if message and not callable(message):
bot.say(message)
else:
return function(bot, trigger, *args, **kwargs)
return guarded
# Hack to allow decorator without parens
if callable(message):
return actual_decorator(message)
return actual_decorator
def example(ex_input, ex_output=None):
"""
Decorate a function with an example input, and optionally a sample output.
Examples are added to the bot.doc dictionary with the function name as
the key alongside it's calling command. The 'commands' decorator should
be used with it.
"""
def add_attribute(function):
if not hasattr(function, "example"):
function.example = []
function.example.append((ex_input, ex_output))
return function
return add_attribute
def url_callback(url):
"""
Decore a function with a callback to the URL module.
This URL will be added to the bot.url_callbacks dictionary in the bot's
memory which the URL module will compare it's URL's against. If a key in
the bot.url_callbacks dict is found inside the gathered URL, this
function will be called instead.
"""
def add_attribute(function):
if not hasattr(function, "url_callback"):
function.url_callback = []
function.url_callback.append(url)
return function
return add_attribute

32
modules/8ball.py Executable file
View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
"""
Classic 8ball.
"""
import random
from module import commands
@commands('8ball')
def eightball(bot, trigger):
"""Classic 8ball."""
response = [
"No",
"Nah",
"Probably not",
"Don't count on it",
"It'll never happen",
"Stop trying",
"Uncertain",
"Who knows~",
"Doubtful",
"I don't know",
"Maybe",
"It could happen",
"Yes",
"Fate is a curious mistress",
"Your life is meaningless",
"Who knows~",
"The winds of change are always blowing",
"The tides have turned"]
bot.reply(random.choice(response))

92
modules/admin.py Executable file
View File

@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""
Some administrative functions relating to the bot.
"""
import module
@module.require_admin
@module.commands('join')
@module.example('.join #example or .join #example key')
def join(bot, trigger):
"""Join the specified channel. This is an admin-only command."""
channel, key = trigger.group(3), trigger.group(4)
if not channel:
return
elif not key:
bot.join(channel)
else:
bot.join(channel, key)
@module.require_admin
@module.commands('part')
@module.example('.part #example')
def part(bot, trigger):
"""Part the specified channel. This is an admin-only command."""
channel, _, part_msg = trigger.group(2).partition(' ')
if not channel:
channel = trigger.channel
if part_msg:
bot.part(channel, part_msg)
else:
bot.part(channel)
@module.require_owner
@module.commands('quit')
def quit(bot, trigger):
"""Quit from the server. This is an owner-only command."""
quit_message = trigger.group(2)
if not quit_message:
quit_message = f"Quitting on command from {trigger.nick}"
bot.quit(quit_message)
@module.require_admin
@module.commands('msg')
@module.example('.msg #YourPants Does anyone else smell neurotoxin?')
def msg(bot, trigger):
"""
Send a message to a given channel or nick. Can only be done by an admin.
"""
if trigger.group(2) is None:
return
channel, _, message = trigger.group(2).partition(' ')
message = message.strip()
if not channel or not message:
return
bot.msg(channel, message)
@module.require_admin
@module.commands('me')
@module.example(".me #erp notices your bulge")
def me(bot, trigger):
"""
Send an ACTION (/me) to a given channel or nick. Can only be done by an
admin.
"""
if trigger.group(2) is None:
return
channel, _, action = trigger.group(2).partition(' ')
action = action.strip()
if not channel or not action:
return
# msg = '\x01ACTION %s\x01' % action
bot.describe(channel, action)
@module.require_admin
@module.commands('selfmode')
@module.example(".mode +B")
def self_mode(bot, trigger):
"""Set a user mode on Sopel. Can only be done in privmsg by an admin."""
mode = trigger.group(3)
add_mode = mode.startswith("+")
bot.mode(bot.nickname, add_mode, mode)

114
modules/adminchannel.py Executable file
View File

@ -0,0 +1,114 @@
#!/usr/bin/env python3
"""
Some administrative functions relating to the channel the bot is in. Note
that most of these will require the bot to have admin privileges in the
channel.
"""
import re
import module
from tools import op_level, configureHostMask
OP = op_level["op"]
HALFOP = op_level["halfop"]
@module.require_chanmsg
@module.require_privilege(OP, 'You are not a channel operator.')
@module.commands('kick')
@module.example(".kick faggot being a faggot")
def kick(bot, trigger):
"""
Kick a user from the channel.
"""
if bot.channels[trigger.channel].privileges[bot.nick] < HALFOP:
return bot.reply("I'm not a channel operator!")
if not trigger.group(2):
return bot.reply("Who do you want me to kick?")
target, _, reason = trigger.group(2).partition(" ")
if not reason:
reason = "Stop doing the bad."
if target == bot.nick:
return bot.reply("I can't let you do that.")
bot.kick(trigger.channel, target, reason)
@module.require_chanmsg
@module.require_privilege(OP, 'You are not a channel operator.')
@module.commands('ban')
@module.example(".ban faggot")
def ban(bot, trigger):
"""
This give admins the ability to ban a user.
The bot must be a Channel Operator for this command to work.
"""
if bot.channels[trigger.channel].privileges[bot.nick] < HALFOP:
return bot.reply("I'm not a channel operator!")
if not trigger.group(2):
return bot.reply("Who do you want me to ban?")
banmask = configureHostMask(trigger.group(2))
bot.mode(trigger.channel, True, "b", mask=banmask)
@module.require_chanmsg
@module.require_privilege(OP, 'You are not a channel operator.')
@module.commands('unban')
@module.example(".unban faggot")
def unban(bot, trigger):
"""
This give admins the ability to unban a user.
The bot must be a Channel Operator for this command to work.
"""
if bot.channels[trigger.channel].privileges[bot.nick] < HALFOP:
return bot.reply("I'm not a channel operator!")
if not trigger.group(2):
return bot.reply("Who do you want me to ban?")
banmask = configureHostMask(trigger.group(2))
bot.mode(trigger.channel, False, "b", mask=banmask)
@module.require_chanmsg
@module.require_privilege(OP, 'You are not a channel operator.')
@module.commands('kickban', 'kb')
def kickban(bot, trigger):
"""
This gives admins the ability to kickban a user.
The bot must be a Channel Operator for this command to work.
.kickban [#chan] user1 user!*@module.* get out of here
"""
if bot.channels[trigger.channel].privileges[bot.nick] < HALFOP:
return bot.reply("I'm not a channel operator!")
if not trigger.group(2):
return bot.reply("Who do you want me to ban?")
target, _, reason = trigger.group(2).partition(" ")
if not reason:
reason = "Stop doing the bad."
if target == bot.nick:
return bot.reply("I can't let you do that.")
banmask = configureHostMask(trigger.group(2).strip())
bot.mode(trigger.channel, False, "b", mask=banmask)
bot.kick(trigger.channel, target, reason)
@module.require_chanmsg
@module.require_privilege(OP, 'You are not a channel operator.')
@module.commands('settopic')
@module.example(".settopic We're discussing penises, would you like to join?")
def settopic(bot, trigger):
"""
This gives ops the ability to change the topic.
The bot must be a Channel Operator for this command to work.
"""
if bot.channels[trigger.channel].privileges[bot.nick] < HALFOP:
return bot.reply("I'm not a channel operator!")
if not trigger.group(2):
return bot.reply("What do you want the topic set to?")
bot.topic(trigger.channel, trigger.group(2).strip())

19
modules/announce.py Executable file
View File

@ -0,0 +1,19 @@
#!/usr/bin/env python3
"""
Sends a message to all channels the bot is currently in.
"""
from module import commands, example
@commands('announce')
@example('.announce Some important message here')
def announce(bot, trigger):
"""
Send an announcement to all channels the bot is in.
"""
if not trigger.admin:
bot.reply("Sorry, I can't let you do that")
return
for channel in bot.channels:
bot.msg(channel, f"[ANNOUNCEMENT] {trigger.group(2)}")
bot.reply('Announce complete.')

398
modules/ascii.py Executable file
View File

@ -0,0 +1,398 @@
#!/usr/bin/env python3
"""
ASCII
"""
from io import BytesIO
import argparse
import requests
from PIL import Image, ImageFont, ImageDraw
#import imageio
import numpy as np
import numpngw
import module
ASCII_CHARS = "$@%#*+=-:. "
BRAIL_CHARS = "⠿⠾⠼⠸⠰⠠ "
HEADERS = {'User-Agent': 'Gimme ascii.'}
def scale_image(image, maxDim=(100,100)):
"""
Resizes an image while preserving the aspect ratio. Chooses the
dimension to scale by based on whichever is larger, ensuring that
neither width or height is ever larger than the maxDim argument.
Because text characters are typically pretty close to a 1:2 rectangle,
we weight the width twice as much.
"""
original_width, original_height = image.size
original_width = original_width * 2
if original_width <= maxDim[0] and original_height <= maxDim[1]:
new_width, new_height = image.size
elif original_width > original_height:
new_width = maxDim[0]
aspect_ratio = original_height/float(original_width)
new_height = int(aspect_ratio * new_width)
else:
new_height = maxDim[1]
aspect_ratio = original_width/float(original_height)
new_width = int(aspect_ratio * new_height)
image = image.resize((new_width, new_height))
return image
def pixels_to_chars(image, scale="ascii", color_code=None):
"""
Maps each pixel to an ascii char based on where it falls in the range
0-255 normalized to the length of the chosen scale.
"""
scales = {"ascii": ASCII_CHARS,
"ascii_reverse": "".join(reversed(ASCII_CHARS)),
"brail": BRAIL_CHARS,
"brail_reverse": "".join(reversed(BRAIL_CHARS))}
color_prefix = {"irc": "\03", "ansi":"\033"}
range_width = int(255 / len(scales[scale])) + (255 % len(scales[scale]) > 0)
pixels = list(image.getdata())
pixels = [pixels[i:i + image.size[0]] for i in range(0, len(pixels),
image.size[0])]
chars = []
for row in pixels:
new_row = ""
for pixel in row:
R, G, B = pixel
L = R * 299/1000 + G * 587/1000 + B * 114/1000
index = int(L/range_width)
char = scales[scale][index]
if color_code and char is not " ":
prefix = color_prefix[color_code] + char_color(pixel,color_code)
char = prefix + char
new_row += char
chars.append(new_row)
return "\n".join(chars)
def char_color(pixel, code="irc"):
"""
Maps a color to a character based on the original pixel color.
Calculates the distance from the provided color to each of the 16
colors in the IRC and ANSI color codes and selects the closest match.
"""
colors_index = ["white", "black", "blue", "green", "red", "brown", "purple",
"orange", "yellow", "light green", "teal", "cyan", "light blue", "pink",
"grey", "silver"]
colors_rgb = {"white": (0,0,0), "black": (255,255,255), "blue": (0,0,255),
"green": (0,255,0), "red": (255,0,0), "brown": (150,75,0),
"purple": (128,0,128), "orange": (255,128,0), "yellow": (255,255,0),
"light green": (191,255,0), "teal": (0,128,128), "cyan": (0,255,255),
"light blue": (65,105,225), "pink":(255,192,203), "grey": (128,128,128),
"silver": (192,192,192)}
colors_irc = {"white": "0", "black": "1", "blue": "2", "green": "3",
"red": "4", "brown": "5", "purple": "6", "orange": "7", "yellow": "8",
"light green": "9", "teal": "10", "cyan": "11", "light blue": "12",
"pink": "13", "grey": "14", "silver": "15"}
colors_ansi = {"white": "[1;37m", "black": "[0;30m", "blue": "[0;34m",
"green": "[0;32m", "red": "[0;31m", "brown": "[0;33m",
"purple": "[0;35m", "orange": "[1;31m", "yellow": "[1;33m",
"light green": "[1;32m", "teal": "[0;36m", "cyan": "[1;36m",
"light blue": "[1;34m", "pink": "[1;35m", "grey": "[1;30m",
"silver": "[0;37m"}
dist = [(abs(pixel[0] - colors_rgb[color][0])**2
+ abs(pixel[1] - colors_rgb[color][1])**2
+ abs(pixel[2] - colors_rgb[color][2])**2)**0.5
for color in colors_index]
color = colors_index[dist.index(min(dist))]
if code == "irc":
return colors_irc[color]
elif code == "ansi":
return colors_ansi[color]
def open_image(imagePath):
"""
Opens the image at the supplied file path in PIL. If an internet URL
is supplied, it will download the image and then open it. Returns a
PIL image object.
"""
try:
if imagePath.startswith("http"):
res = requests.get(imagePath, headers=HEADERS, verify=True,
timeout=20)
if res.status_code == 404:
return "404: file not found."
res.raise_for_status()
image = Image.open(BytesIO(res.content))
else:
image = Image.open(imagePath)
except FileNotFoundError:
return f"File not found: {imagePath}"
except Exception as e:
return(f"Error opening image: {imagePath}\n{e}")
return image
def alpha_composite(image, color=(255, 255, 255)):
"""
Alpha composite an RGBA Image with a specified color.
Source: http://stackoverflow.com/a/9166671/284318
"""
image.load() # needed for split()
background = Image.new('RGB', image.size, color)
background.paste(image, mask=image.split()[3]) # 3 is the alpha channel
return background
def image_to_ascii(image=None, reverse=False, brail=False, color=None,**kwargs):
"""
Reads an image file and converts it to ascii art. Returns a
newline-delineated string. If reverse is True, the ascii scale is
reversed.
"""
if not image:
image = open_image(kwargs["imagePath"])
if image.mode == "P":
image = image.convert(image.palette.mode)
if image.mode == "RGBA":
image = alpha_composite(image).convert("RGB")
if image.mode == "L":
image = image.convert("RGB")
image = scale_image(image)
if brail:
scale = "brail"
else:
scale = "ascii"
if reverse:
scale += "_reverse"
chars = pixels_to_chars(image, scale, color)
image.close()
del(image)
return chars
def ascii_to_image(image_ascii):
"""
Creates a plain image and draws text on it.
"""
# TODO: make font type, size and color non-fixed
width = len(image_ascii[:image_ascii.index("\n")]) * 8
height = (image_ascii.count("\n")+1) * 12 + 4
font = ImageFont.truetype("LiberationMono-Regular.ttf", 14)
image = Image.new("RGB", (width, height), (255,255,255))
draw = ImageDraw.Draw(image)
draw.text((0,0), image_ascii, (0,0,0), font=font, spacing=0)
return image
def handle_gif(imagePath, **kwargs):
"""
Handle gifs seperately.
"""
image = open_image(imagePath)
ascii_seq = []
new_image = ascii_to_image(image_to_ascii(image, **kwargs))
image.seek(1)
while True:
try:
im = ascii_to_image(image_to_ascii(image, **kwargs))
ascii_seq.append(im)
image.seek(image.tell()+1)
except EOFError:
break # end of sequence
# new_image.save(output, save_all=True, append_images=ascii_seq,
# duration=60, loop=0, optimize=True)
ascii_seq = [new_image] + ascii_seq
np_ascii_seq = [np.array(im) for im in ascii_seq]
with open(kwargs["output"], "wb") as file:
numpngw.write_apng(file, np_ascii_seq)
@module.rate(user=60)
@module.require_chanmsg(message="It's impolite to whisper.")
@module.commands('ascii')
@module.example('.ascii [-rc] https://www.freshports.org/images/freshports.jpg')
def ascii(bot, trigger):
"""
Downloads an image and converts it to ascii.
"""
if not trigger.group(2):
return bot.say()
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("imagePath")
parser.add_argument("-r", "--reverse", action="store_true", help="Reverse.")
parser.add_argument("-c", "--color", action="store_true")
parser.add_argument("-b", "--brail", action="store_true")
parser.add_argument("-B", "--brail2", action="store_true")
parser.add_argument("-a", "--animated", action="store_true")
parser.add_argument("-h", "--help", action="store_true")
args = parser.parse_args(trigger.group(2).split())
if args.help:
return bot.say(parser.print_help())
if args.color:
args.color = "irc"
if not args.imagePath.startswith("http"):
bot.reply("Internet requests only.")
return
if args.animated:
args.output = "temp.png"
handle_gif(**vars(args))
file = {"file": open("temp.png", "rb")}
res = requests.post("https://uguu.se/api.php?d=upload-tool", files=file)
bot.say(res.text)
elif args.brail2:
image = open_image(args.imagePath)
image_ascii = image_to_brail(image)
image_ascii = image_ascii.replace(""," ")
bot.say(image_ascii)
else:
image_ascii = image_to_ascii(None, **vars(args))
bot.say(image_ascii)
def brail_char(chunk, threshold):
"""
Accepts a numpy matrix and spits out a brail character.
"""
chunk = np.array_split(chunk, 3, axis=0)
chunk = np.concatenate(chunk, axis=1)
chunk = np.array_split(chunk, 6, axis=1)
dots = ""
for sub_chunk in chunk:
if np.mean(sub_chunk) < threshold:
dots += "1"
else:
dots += "0"
char = chr(int(dots, base=2)+10240)
return char
def image_to_brail(image, fontSize=(8,15)):
"""
An alternative method of generating brail ascii art.
"""
if not image:
image = open_image(kwargs["imagePath"])
if image.mode == "P":
image = image.convert(image.palette.mode)
if image.mode == "RGBA":
image = alpha_composite(image).convert("RGB")
image = image.convert("L")
matSize = (image.size[0] // fontSize[0], image.size[1] // fontSize[1])
if image.size[0] > fontSize[0]*100 or image.size[1] > fontSize[1]*100:
image = scale_image(image, (fontSize[0]*100, fontSize[1]*100))
image = image.crop((0, 0, matSize[0]*fontSize[0], matSize[1]*fontSize[1]))
threshold = np.mean(image)
grid = np.array(image)
grid = np.split(grid, matSize[1], axis=0)
grid = np.concatenate(grid, axis=1)
grid = np.split(grid, matSize[0]*matSize[1], axis=1)
for n, chunk in enumerate(grid):
char = brail_char(chunk, threshold)
grid[n] = char
grid = "".join(grid)
grid = [grid[n : n + matSize[0]] for n in range(0, len(grid), matSize[0])]
return "\n".join(grid)
if __name__=='__main__':
import argparse
parser = argparse.ArgumentParser(
description="Converts an image file to ascii art.")
parser.add_argument(
"imagePath",
help="The path to the image file. May be a local path or internet URL.")
parser.add_argument(
"-r",
"--reverse",
action="store_true",
help="Reverses the ascii scale.")
parser.add_argument(
"-o",
"--output",
help="Outputs the ascii art into a file at the specified path.")
parser.add_argument(
"-i",
"--image",
dest="drawImage",
action="store_true",
help="Outputs the ascii art as an image rather than text. Requires \
--output.")
parser.add_argument(
"-a",
"--animated",
action="store_true",
help="Handles animated GIFs. Includes --image.")
parser.add_argument(
"-c",
"--color",
type=str,
help="Colorizes the ascii matrix. Currently supported modes are 'irc' \
and 'ansi' for generating color codes compliant with those standards.")
parser.add_argument(
"--ansi",
dest="color",
action="store_const",
const="ansi",
help="Shortcut for '--color ansi'.")
parser.add_argument(
"--irc",
dest="color",
action="store_const",
const="irc",
help="Shortcut for '--color irc'.")
parser.add_argument(
"-b",
"--brail",
action="store_true",
help="Uses brail unicode characters instead of ascii characters.")
args = parser.parse_args()
if args.animated: # --animated includes --image
args.drawImage = True
if args.drawImage: # --image requires --output
if not args.output:
parser.error("--image requires --output")
if args.animated:
handle_gif(**vars(args))
exit()
image_ascii = image_to_ascii(None, **vars(args))
if args.drawImage:
image = ascii_to_image(image_ascii)
image.save(args.output, "PNG")
elif args.output:
with open(args.output, "w+") as file:
file.write(image_ascii)
else:
print(image_ascii)

46
modules/away.py Executable file
View File

@ -0,0 +1,46 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Marks a user as away with an optional message and informs anyone who attempt
to ping them of their away status. Goes away when the user talks again.
"""
from module import commands, example, hook
def setup(bot):
bot.memory['away'] = {}
@commands('away')
@example('.away commiting sudoku')
def away(bot, trigger):
"""
Stores in the user's name and away message in memory.
"""
if not trigger.group(2):
bot.memory['away'][trigger.nick] = ""
else:
bot.memory['away'][trigger.nick] = trigger.group(2)
@hook(True)
def message(bot, trigger):
"""
If an away users name is said, print their away message.
"""
name = trigger.group(1)
if name.endswith(":") or name.endswith(","):
name = name[:-1]
if name in bot.memory["away"]:
print(True)
msg = f"\x0308{name}\x03 is away: \x0311{bot.memory['away'][name]}"
bot.say(msg)
@hook(True)
def notAway(bot, trigger):
"""
If an away user says something, remove them from the away dict.
"""
if not trigger.group(0).startswith(".away"):
if trigger.nick in bot.memory["away"]:
bot.memory["away"].pop(trigger.nick)

73
modules/banhe.py Executable file
View File

@ -0,0 +1,73 @@
#!/usr/bin/env python3
"""
ban he
ban he
ban he
"""
import time
from module import commands, example, require_admin
from tools import configureHostMask
@commands('banhe')
@example('.banhe assfaggot 30m')
def banhe(bot, trigger):
"""
Bans he for a set period of time. Admins may set the period of time,
non-admins only get 20 second bans.
"""
banhee, period = trigger.group(3), trigger.group(4)
if not trigger.admin:
period = 20
else:
conv = {'s':1, 'm':60, 'h':3600, 'd':86400}
try:
period = conv[period[-1]] * int(period[:-1])
except (KeyError, ValueError, TypeError):
period = 0
banmask = configureHostMask(banhee)
bot.mode(trigger.channel, True, "b", mask=banmask)
if period > 2592000:
bot.reply("It's too big, Onii-chan.")
if not period or period > 2592000:
return bot.say(f"Banned \x0304{banhee}\x03 for \x0309∞\x03 seconds.")
bot.say(f"Banned \x0304{banhee}\x03 for \x0309{str(period)}\x03 seconds.")
time.sleep(period)
bot.mode(trigger.channel, False, "b", mask=banmask)
bot.say(f"Unbanned \x0304{banhee}\x03")
@require_admin
@commands("banheall")
def banheall(bot, trigger):
"""
Ban them all, Johnny.
"""
period = trigger.group(2)
conv = {'s':1, 'm':60, 'h':3600, 'd':86400}
try:
period = conv[period[-1]] * int(period[:-1])
except (IndexError, KeyError, ValueError, TypeError):
period = 0
for nick in bot.channels[trigger.channel].users:
banmask = configureHostMask(nick)
bot.mode(trigger.channel, True, "b", mask=banmask)
if period > 2592000:
bot.reply("It's too big, Onii-chan.")
if not period or period > 2592000:
return bot.say("Banned \x0304them all\x03 for \x0309∞\x03 seconds.")
bot.say(f"Banned \x0304them all\x03 for \x0309{str(period)}\x03 seconds.")
time.sleep(period)
for nick in bot.channels[trigger.channel].users:
banmask = configureHostMask(nick)
bot.mode(trigger.channel, False, "b", mask=banmask)
bot.say("Unbanned \x0304them all\x03")

17
modules/bq.py Executable file
View File

@ -0,0 +1,17 @@
#!/usr/bin/env python3
"""
Various things related to Banished Quest.
"""
from module import commands
from tools.time import relativeTime
@commands('bq')
def BQstatus(bot, trigger):
"""
Displays the current status of BQ.
"""
status = "\x0304DEAD"
deathdate = "[2017-02-16 00:19:00]"
msg = "Banished Quest status: " + status + "\nTime since death: "
msg += relativeTime(bot.config, datetime.now(), deathdate) + " ago"
bot.say(msg)

48
modules/calc.py Executable file
View File

@ -0,0 +1,48 @@
#!/usr/bin/env python3
"""
A basic calculator and python interpreter application.
"""
import sys
import requests
from module import commands, example
from tools.calculation import eval_equation
BASE_TUMBOLIA_URI = 'https://tumbolia-two.appspot.com/'
@commands('c', 'calc')
@example('.c 5 + 3', '8')
def c(bot, trigger):
"""Evaluate some calculation."""
if not trigger.group(2):
return bot.reply("Nothing to calculate.")
# Account for the silly non-Anglophones and their silly radix point.
eqn = trigger.group(2).replace(',', '.')
try:
result = eval_equation(eqn)
result = "{:.10g}".format(result)
except ZeroDivisionError:
result = "Division by zero is not supported in this universe."
except Exception as e:
result = "{error}: {msg}".format(error=type(e), msg=e)
bot.reply(result)
@commands('py')
@example('.py len([1,2,3])', '3')
def py(bot, trigger):
"""Evaluate a Python expression."""
if not trigger.group(2):
return bot.say("Need an expression to evaluate")
query = trigger.group(2)
uri = BASE_TUMBOLIA_URI + 'py/'
res = requests.get(uri + query)
res.raise_for_status()
answer = res.text
if answer:
#bot.say can potentially lead to 3rd party commands triggering.
bot.say(answer)
else:
bot.reply('Sorry, no result.')

32
modules/countdown.py Executable file
View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
"""
Provides a countdown to some particular date.
"""
from datetime import datetime
from module import commands, example
from tools.time import relativeTime
@commands("countdown")
@example(".countdown 2012 12 21")
def generic_countdown(bot, trigger):
"""
.countdown <year> <month> <day> - displays a countdown to a given date.
"""
text = trigger.group(2)
if not text:
return bot.say("Please use correct format: .countdown 2012 12 21")
text = text.split()
if (len(text) != 3 or not text[0].isdigit() or not text[1].isdigit()
or not text[2].isdigit()):
return bot.say("Please use correct format: .countdown 2012 12 21")
try:
date = datetime(int(text[0]), int(text[1]), int(text[2]))
except:
return bot.say("Please use correct format: .countdown 2012 12 21")
msg = relativeTime(bot.config, datetime.now(), date)
msg += " until " + trigger.group(2)
bot.say(msg)

70
modules/currency.py Executable file
View File

@ -0,0 +1,70 @@
#!/usr/bin/env python3
"""
Currency conversions. BTC is handled separately from country currencies.
Other crypto coins to be added someday.
"""
import requests
from module import commands, example
CUR_URI = "https://v3.exchangerate-api.com/bulk/{API_KEY}/{CUR_FROM}"
BTC_URI = "https://api.coindesk.com/v1/bpi/currentprice/{CUR_TO}.json"
@commands('cur', 'currency', 'exchange')
@example('.cur 20 EUR to USD')
def exchange(bot, trigger):
"""Show the exchange rate between two currencies"""
amount = trigger.group(3)
cur_from = trigger.group(4)
cur_to = trigger.group(5)
if cur_to == "to":
cur_to = trigger.group(6)
if not all((amount, cur_to, cur_from)):
return bot.reply("I didn't understand that. Try: .cur 20 EUR to USD")
try:
amount = float(amount)
except ValueError:
return bot.reply("Invalid amount. Must be number.")
cur_to = cur_to.upper()
cur_from = cur_from.upper()
api_key = bot.config["currency"].get("api_key")
url = URI.format(**{"API_KEY": api_key, "CUR_FROM": cur_from})
res = requests.get(url, verify=True)
res.raise_for_status()
data = res.json()
if data["result"] == "failed":
return bot.reply("Invalid input currency. Must be ISO 4217 compliant.")
rate = data["rates"].get(cur_to)
if not rate:
return bot.reply("Invalid output currency. Must be ISO 4217 compliant.")
new_amount = round(rate*amount, 2)
msg = f"\x0310{amount} {cur_from}\x03 = \x0312{new_amount} {cur_to}"
bot.msg(msg)
@commands('btc', 'bitcoin')
@example('.btc EUR')
def bitcoin(bot, trigger):
"""
Show the current bitcoin value in USD. Optional parameter allows non-USD
conversion.
"""
cur_to = trigger.group(3)
if not cur_to:
cur_to = "USD"
cur_to = cur_to.upper()
url = BTC_URI.format(**{"CUR_TO": cur_to})
res = requests.get(url, verify=True)
if res.text.startswith("Sorry"):
return bot.reply("Invalid currency type. Must be ISO 4217 compliant.")
data = res.json()
rate = data["bpi"][cur_to]["rate_float"]
msg = f"\x03101 BTC\x03 = \x0312{rate} {cur_to}"
bot.msg(msg)

244
modules/dice.py Executable file
View File

@ -0,0 +1,244 @@
#!/usr/bin/env python3
"""
Dice rolling, the core function of any IRC bot.
"""
import random
import re
import operator
import module
from tools.calculation import eval_equation
class DicePouch:
def __init__(self, num_of_die, type_of_die, addition):
"""
Initialize dice pouch and roll the dice.
Args:
num_of_die: number of dice in the pouch.
type_of_die: how many faces the dice have.
addition: how much is added to the result of the dice.
"""
self.num = num_of_die
self.type = type_of_die
self.addition = addition
self.dice = {}
self.dropped = {}
self.roll_dice()
def roll_dice(self):
"""Roll all the dice in the pouch."""
self.dice = {}
self.dropped = {}
for __ in range(self.num):
number = random.randint(1, self.type)
count = self.dice.setdefault(number, 0)
self.dice[number] = count + 1
def drop_lowest(self, n):
"""
Drop n lowest dice from the result.
Args:
n: the number of dice to drop.
"""
sorted_x = sorted(self.dice.items(), key=operator.itemgetter(0))
for i, count in sorted_x:
count = self.dice[i]
if n == 0:
break
elif n < count:
self.dice[i] = count - n
self.dropped[i] = n
break
else:
self.dice[i] = 0
self.dropped[i] = count
n = n - count
for i, count in self.dropped.items():
if self.dice[i] == 0:
del self.dice[i]
def get_simple_string(self):
"""Return the values of the dice like (2+2+2[+1+1])+1."""
dice = self.dice.items()
faces = ("+".join([str(face)] * times) for face, times in dice)
dice_str = "+".join(faces)
dropped_str = ""
if self.dropped:
dropped = self.dropped.items()
dfaces = ("+".join([str(face)] * times) for face, times in dropped)
dropped_str = "[+%s]" % ("+".join(dfaces),)
plus_str = ""
if self.addition:
plus_str = "{:+d}".format(self.addition)
return "(%s%s)%s" % (dice_str, dropped_str, plus_str)
def get_compressed_string(self):
"""Return the values of the dice like (3x2[+2x1])+1."""
dice = self.dice.items()
faces = ("%dx%d" % (times, face) for face, times in dice)
dice_str = "+".join(faces)
dropped_str = ""
if self.dropped:
dropped = self.dropped.items()
dfaces = ("%dx%d" % (times, face) for face, times in dropped)
dropped_str = "[+%s]" % ("+".join(dfaces),)
plus_str = ""
if self.addition:
plus_str = "{:+d}".format(self.addition)
return "(%s%s)%s" % (dice_str, dropped_str, plus_str)
def get_sum(self):
"""Get the sum of non-dropped dice and the addition."""
result = self.addition
for face, times in self.dice.items():
result += face * times
return result
def get_number_of_faces(self):
"""
Returns sum of different faces for dropped and not dropped dice
This can be used to estimate, whether the result can be shown in
compressed form in a reasonable amount of space.
"""
return len(self.dice) + len(self.dropped)
def _roll_dice(bot, dice_expression):
result = re.search(
r"""
(?P<dice_num>-?\d*)
d
(?P<dice_type>-?\d+)
(v(?P<drop_lowest>-?\d+))?
$""",
dice_expression,
re.IGNORECASE | re.VERBOSE)
dice_num = int(result.group('dice_num') or 1)
dice_type = int(result.group('dice_type'))
# Dice can't have zero or a negative number of sides.
if dice_type <= 0:
bot.reply("I don't have any dice with %d sides. =(" % dice_type)
return None # Signal there was a problem
# Can't roll a negative number of dice.
if dice_num < 0:
bot.reply("I'd rather not roll a negative amount of dice. =(")
return None # Signal there was a problem
# Upper limit for dice should be at most a million. Creating a dict with
# more than a million elements already takes a noticeable amount of time
# on a fast computer and ~55kB of memory.
if dice_num > 1000:
bot.reply('I only have 1000 dice. =(')
return None # Signal there was a problem
dice = DicePouch(dice_num, dice_type, 0)
if result.group('drop_lowest'):
drop = int(result.group('drop_lowest'))
if drop >= 0:
dice.drop_lowest(drop)
else:
bot.reply("I can't drop the lowest %d dice. =(" % drop)
return dice
@module.commands("roll", "dice", "d")
@module.example(".roll 3d1+1", "You roll 3d1+1: (1+1+1)+1 = 4")
def roll(bot, trigger):
"""
.dice XdY[vZ][+N], rolls dice and reports the result.
X is the number of dice. Y is the number of faces in the dice. Z is the
number of lowest dice to be dropped from the result. N is the constant to
be applied to the end result.
"""
# This regexp is only allowed to have one captured group, because having
# more would alter the output of re.findall.
dice_regexp = r"-?\d*[dD]-?\d+(?:[vV]-?\d+)?"
# Get a list of all dice expressions, evaluate them and then replace the
# expressions in the original string with the results. Replacing is done
# using string formatting, so %-characters must be escaped.
if not trigger.group(2):
return bot.reply("No dice to roll.")
arg_str = trigger.group(2)
dice_expressions = re.findall(dice_regexp, arg_str)
arg_str = arg_str.replace("%", "%%")
arg_str = re.sub(dice_regexp, "%s", arg_str)
f = lambda dice_expr: _roll_dice(bot, dice_expr)
dice = list(map(f, dice_expressions))
if None in dice:
# Stop computing roll if there was a problem rolling dice.
return
def _get_eval_str(dice):
return "(%d)" % (dice.get_sum(),)
def _get_pretty_str(dice):
if dice.num <= 10:
return dice.get_simple_string()
elif dice.get_number_of_faces() <= 10:
return dice.get_compressed_string()
else:
return "(...)"
eval_str = arg_str % (tuple(map(_get_eval_str, dice)))
pretty_str = arg_str % (tuple(map(_get_pretty_str, dice)))
# Showing the actual error will hopefully give a better hint of what is
# wrong with the syntax than a generic error message.
try:
result = eval_equation(eval_str)
except Exception as e:
bot.reply("SyntaxError, eval(%s), %s" % (eval_str, e))
return
bot.reply("You roll %s: %s = %d" % (
trigger.group(2), pretty_str, result))
@module.commands("choice")
@module.commands("ch")
@module.commands("choose")
@module.example(".choose opt1,opt2,opt3")
def choose(bot, trigger):
"""
.choice option1|option2|option3 - Makes a difficult choice easy.
"""
if not trigger.group(2):
return bot.reply('I\'d choose an option, but you didn\'t give me any.')
choices = [trigger.group(2)]
for delim in '|\\/,':
choices = trigger.group(2).split(delim)
if len(choices) > 1:
break
# Use a different delimiter in the output, to prevent ambiguity.
for show_delim in ',|/\\':
if show_delim not in trigger.group(2):
show_delim += ' '
break
pick = random.choice(choices)
msg = f"Your options: {show_delim.join(choices)}. My choice: {pick}"
bot.reply(msg)

12
modules/echo.py Executable file
View File

@ -0,0 +1,12 @@
#!/usr/bin/env python3
"""
Echo.
"""
from module import commands, example
@commands('echo')
@example('.echo balloons')
def echo(bot, trigger):
"""Echos the given string."""
if trigger.group(2):
bot.say(trigger.group(2))

27
modules/grog.py Executable file
View File

@ -0,0 +1,27 @@
#!/usr/bin/env python3
"""
Selects a random Grog of Substantial Whimsy effect.
"""
import os
import random
from module import commands
@commands("grog")
def grog(bot, trigger):
"""
Picks a random status effect from Grog of Substantial Whimsy effect.
"""
path = os.path.join(bot.config["core"].get("homedir"), "static", "grog.txt")
with open(path, "r") as file:
data = file.read().split("\n")
num = 0
if trigger.group(2):
try:
num = int(trigger.group(2)) - 1
except:
pass
if num and num < len(data):
bot.say(data[num])
else:
bot.say(random.choice(data))

91
modules/hangman.py Executable file
View File

@ -0,0 +1,91 @@
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hangman.
"""
import random
import module
class Hangman():
def __init__(self):
self.running = False
def newgame(self):
self.running = True
self.tries = 8
self.word = self._PickWord()
self.working = [x for x in self.word]
self.blanks = list('_' * len(self.word))
for n,char in enumerate(self.word):
if char == ' ':
self.blanks[n] = ' '
def _PickWord(self):
with open("/home/iou1name/.sopel/wordlist.txt",'r') as file:
lines = file.readlines()
wrd = list(lines[ random.randint(0, len(lines))-1 ].strip())
return wrd
def solve(self, guess):
if list(guess) == self.word:
self.running = False
self.blanks = self.word
return 'win'
elif guess in self.word and len(guess) == 1:
while guess in self.working:
index = self.working.index(guess)
self.blanks[index] = guess
self.working[index] = '_'
return 'correct'
else:
self.tries = self.tries - 1
if self.tries == 0:
self.running = False
self.blanks = self.word
return 'lose'
else:
return 'incorrect'
def return_blanks(self):
return ''.join(self.blanks)
hangman = Hangman()
@module.commands('hangman')
@module.example('.hangman')
def hangman_start(bot, trigger):
"""Starts a game of hangman."""
if hangman.running:
bot.reply("There is already a game running.")
return
hangman.newgame()
bot.say(trigger.nick + " has started a game of hangman! Type .guess to guess a letter or the entire phrase.")
bot.say(hangman.return_blanks())
@module.commands('guess')
@module.example('.guess a')
@module.example('.guess anus')
def guess(bot, trigger):
"""Makes a guess in hangman. May either guess a single letter or the entire word/phrase."""
if not hangman.running:
bot.reply('There is no game currently running. Use .hangman to start one')
return
response = hangman.solve(trigger.group(2))
if response == 'win':
bot.say(trigger.nick + " has won!")
elif response == 'correct':
pass
elif response == 'lose':
bot.say("Game over.")
elif response == 'incorrect':
bot.reply("incorrect.")
bot.say( str(hangman.tries) + " tries left." )
else:
bot.say('Fuck.')
bot.say(hangman.return_blanks())

33
modules/help.py Executable file
View File

@ -0,0 +1,33 @@
#!/usr/bin/env python3
"""
Displays help docs and examples for commands, as well as lists all commands
available.
"""
import random
from module import commands, example
@commands('help', 'commands')
@example('.help tell')
def help(bot, trigger):
"""Shows a command's documentation, and possibly an example."""
if trigger.group(2):
name = trigger.group(2)
name = name.lower()
if name not in bot.doc:
return
doc = bot.doc[name]
docstring, examples = doc
if examples:
ex = random.choice(examples)
bot.msg(docstring)
if ex:
bot.msg("Ex. In: " + ex[0])
if ex[1]:
bot.msg("Ex. Out: " + ex[1])
else:
cmds = sorted(bot.doc.keys())
msg = "Available commands: " + ", ".join(cmds)
bot.msg(msg)

65
modules/iot.py Executable file
View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
"""
Long live the Internet of Things!
"""
import time
import requests
import module
@module.require_admin
@module.commands('lamp')
def lampToggle(bot, trigger):
"""
Turns my lamp on and off. The glory of IoT!
"""
try:
res = requests.get("http://192.168.1.12/gpio?0=toggle", timeout=10)
except requests.exceptions.ReadTimeout:
return bot.say("Connection error. Timeout reached.")
except requests.exceptions.ConnectionError:
return bot.say("Connection error. Is the unit dead?")
if res.text[32] == 'L':
bot.say("Lamp is now OFF.")
elif res.text[32] == 'H':
bot.say("Lamp is now ON.")
#@module.require_admin
@module.commands('roomtemp')
def roomTemp(bot, trigger):
"""
Gets the temperature of my room.
"""
try:
res = requests.get("http://192.168.1.25/", timeout=10)
del res
time.sleep(1.5)
res = requests.get("http://192.168.1.25/", timeout=10)
except requests.exceptions.ReadTimeout:
return bot.say("Connection error. Timeout reached.")
except requests.exceptions.ConnectionError:
return bot.say("Connection error. Is the unit dead?")
bot.say(res.text)
@module.require_admin
@module.commands('inkwrite')
def inkWrite(bot, trigger):
"""
Writes shit to my e-ink screen.
"""
text = trigger.replace(".inkwrite ", "")
if not text:
return bot.say("Need something to write.")
try:
res = requests.get(f"http://192.168.1.125:8000/?text={text}",
timeout=10)
except requests.exceptions.ReadTimeout:
return bot.say("Connection error. Timeout reached.")
except requests.exceptions.ConnectionError:
return bot.say("Connection error. Is the unit dead?")
bot.say("Wrote: " + res.text)

31
modules/ipython.py Executable file
View File

@ -0,0 +1,31 @@
#!/usr/bin/env python3
"""
An iPython interactive console for debugging purposes.
"""
from IPython.terminal.embed import InteractiveShellEmbed
import module
def setup(bot):
bot.memory["iconsole_running"] = False
@module.require_admin("Only admins can start the interactive console")
@module.commands('console')
def interactive_shell(bot, trigger):
"""
Starts an interactive IPython console
"""
if bot.memory['iconsole_running']:
return bot.say('Console already running')
banner1 = 'Sopel interactive shell (embedded IPython)'
banner2 = '`bot` and `trigger` are available. To exit, type exit'
exitmsg = 'Interactive shell closed'
console = InteractiveShellEmbed(banner1=banner1, banner2=banner2,
exit_msg=exitmsg)
bot.memory['iconsole_running'] = True
bot.say('console started')
console()
bot.memory['iconsole_running'] = False

44
modules/isup.py Executable file
View File

@ -0,0 +1,44 @@
#!/usr/bin/env python3
"""
Checks if a website is up by sending a HEAD request to it.
"""
import requests
from module import commands, require_chanmsg
@require_chanmsg(message="It's not polite to whisper.")
@commands('isup')
def isup(bot, trigger):
"""Queries the given url to check if it's up or not."""
url = trigger.group(2)
if not url:
return bot.reply("What URL do you want to check?")
if url.startswith("192") and not trigger.owner:
return bot.reply("Do not violate the LAN.")
if not url.startswith("http"):
url = "http://" + url
try:
res = requests.head(url, verify=True)
except (requests.exceptions.MissingSchema,
requests.exceptions.InvalidSchema):
return bot.say("Missing or invalid schema. Check the URL.")
except requests.exceptions.ConnectionError:
return bot.say("Connection error. Are you sure this is a real website?")
except requests.exceptions.InvalidURL:
return bot.say("Invalid URL.")
except Exception as e:
print(e)
return bot.say("Listen buddy. I don't know what you're doing, but \
you're not doing it right.")
try:
res.raise_for_status()
return bot.say(url + " appears to be working from here.")
except requests.exceptions.HTTPError:
return bot.say(url + " looks down from here.")

12
modules/lmgtfy.py Executable file
View File

@ -0,0 +1,12 @@
#!/usr/bin/env python3
"""
Let me google that for you.
"""
from module import commands
@commands('lmgtfy')
def googleit(bot, trigger):
"""Let me just... google that for you."""
if not trigger.group(2):
return bot.say('http://google.com/')
bot.say('http://lmgtfy.com/?q=' + trigger.group(2).replace(' ', '+'))

174
modules/movie.py Executable file
View File

@ -0,0 +1,174 @@
#!/usr/bin/env python3
"""
This module exracts various information from imbd.
It also contains functionality for the local movie database.
"""
import os
import random
import threading
from sqlite3 import IntegrityError, OperationalError
import bs4
import requests
from module import commands, example, require_admin
def setup(bot):
bot.memory['movie_lock'] = threading.Lock()
con = bot.db.connect()
cur = con.cursor()
try:
cur.execute("SELECT * FROM movie").fetchone()
except OperationalError:
cur.execute("CREATE TABLE movie("
"movie_title TEXT NOT NULL PRIMARY KEY,"
"added_by text DEFAULT 'UNKNOWN',"
"added_date INTEGER DEFAULT (STRFTIME('%s', 'now')),"
"times_watched INTEGER DEFAULT 0,"
"first_watched TEXT DEFAULT 'NA',"
"shitpost INTEGER DEFAULT 0,"
"theater_release_date TEXT,"
"bluray_release_date TEXT"
")")
con.commit()
con.close()
@commands('movie', 'tmdb')
@example('.movie ThisTitleDoesNotExist', '[MOVIE] Movie not found!')
@example('.movie Citizen Kane', '[MOVIE] Title: Citizen Kane | Year: \
1941 | Rating: 8.4 | Genre: Drama, Mystery | IMDB Link: \
http://imdb.com/title/tt0033467')
def movieInfo(bot, trigger):
"""
Returns some information about a movie, like Title, Year, Rating,
Genre and IMDB Link.
"""
word = trigger.group(2)
if not word:
return bot.reply("What movie?")
word = word.replace(" ", "+")
api_key = bot.config.movie.tmdb_api_key
uri = "https://api.themoviedb.org/3/search/movie?" + \
f"api_key={api_key}&query={word}"
data = requests.get(uri, timeout=30, verify=True).json()
try:
data = data['results'][0]
except IndexError:
return bot.reply("No results found.")
except KeyError:
print(data)
return bot.reply("An error. Please notify an adult.")
uri = "https://api.themoviedb.org/3/genre/movie/list?" + \
f"api_key={api_key}&language=en-US"
genres = requests.get(uri, timeout=30, verify=True).json()
try:
genres = genres['genres']
except KeyError:
return bot.reply("An error. Please notify an adult.")
movieGenres = []
for genre in genres:
if genre['id'] in data['genre_ids']:
movieGenres.append(genre['name'])
msg = "[\x0304MOVIE\x03] \x0310Title\x03: \x0312" + data['title'] + \
"\x03 | \x0310Year\x03: \x0308" + data['release_date'][:4] + \
"\x03 | \x0310Rating\x03: \x0312" + str(data['vote_average']) + \
"\x03 | \x0310Genre\x03: \x0312" + ", ".join(movieGenres) + \
"\x03 | \x0310TMDb Link\x03: \x0307" + \
"https://www.themoviedb.org/movie/" + str(data['id'])
msg += "\n\x0310Theater release date\x03: \x0308" + data['release_date'] + \
"\x03 | \x0310Physical release date\x03: \x0308" + \
phyiscalRelease(word, data['id'], api_key)
msg += "\n\x0310Overview\x03: " + data['overview']
bot.say(msg)
def phyiscalRelease(word, tmdb_id=None, api_key=None):
"""
Attempts to find a physical US release from TMDb. Failing that, it will
find a date on www.dvdreleasedates.com.
"""
if not tmdb_id:
return "Feature not yet implemented."
uri = f"https://api.themoviedb.org/3/movie/{tmdb_id}/release_dates?" + \
f"api_key={api_key}"
res = requests.get(uri, timeout=30, verify=True)
res.raise_for_status()
try:
releases = res.json()['results']
except KeyError:
return "No results found."
for release in releases:
if release["iso_3166_1"] != "US":
continue
for date in release["release_dates"]:
if date["type"] == 5:
return date["release_date"][:10]
#return "No physical US release found."
return dvdReleaseDates(word)
def dvdReleaseDates(word):
"""
Scrapes www.dvdsreleasedates.com for physical release dates.
"""
uri = f"http://www.dvdsreleasedates.com/search.php?searchStr={word}"
res = requests.get(uri, timeout=30, verify=True)
soup = bs4.BeautifulSoup(res.text, "html.parser")
rDate = soup.title.text[soup.title.text.rfind("Date")+4:]
if not rDate:
rDate = "Not announced."
elif rDate.startswith("rch results for"):
rDate = "Not found."
return rDate.strip()
@commands('pickmovie', 'getmovie')
@example('.pickmovie', 'Commandos')
def pickMovie(bot, trigger):
"""
Picks a random movie title out of the database.
"""
bot.memory['movie_lock'].acquire()
cur = bot.db.execute("SELECT movie_title FROM movie WHERE " + \
"times_watched < 1 AND shitpost = 0 ORDER BY RANDOM() LIMIT 1;")
movie = cur.fetchone()
bot.memory['movie_lock'].release()
if not movie:
return bot.reply("Movie database is empty!")
else:
bot.reply(movie[0])
if trigger.group(2) == "-m":
trigger.set_group(f".movie {movie}")
movieInfo(bot, trigger)
@require_admin
@commands('addmovie')
@example('.addmovie Gay Niggers From Outer Space')
def addMovie(bot, trigger):
"""
Adds the specified movie to the movie database.
"""
bot.memory['movie_lock'].acquire()
movie = trigger.group(2)
try:
bot.db.execute("INSERT INTO movie (movie_title, added_by) VALUES(?,?)",
(movie, trigger.nick))
confirm = f"Added movie: {movie}"
except IntegrityError:
confirm = f"Error: {movie} is already in the database."
bot.memory['movie_lock'].release()
bot.say(confirm)

14
modules/pingall.py Executable file
View File

@ -0,0 +1,14 @@
#!/usr/bin/env python3
"""
Pings everyone in the channel.
"""
from module import commands
@commands('pingall', 'names')
def pingAll(bot, trigger):
"""
Says the nick of everyone in the channel. Great way to get thier
attention, or just annoy them.
"""
msg = " ".join(bot.channels[trigger.channel].users)
bot.say(msg)

38
modules/rand.py Executable file
View File

@ -0,0 +1,38 @@
#!/usr/bin/env python3
"""
Pick some random numbers.
"""
import sys
import random
from module import commands, example
@commands("rand")
@example(".rand 2", "random(0, 2) = 1")
@example(".rand -1 -1", "random(-1, -1) = -1")
@example(".rand", "random(0, 56) = 13")
@example(".rand 99 10", "random(10, 99) = 53")
@example(".rand 10 99", "random(10, 99) = 29")
def rand(bot, trigger):
"""Replies with a random number between first and second argument."""
arg1 = trigger.group(3)
arg2 = trigger.group(4)
try:
if arg2 is not None:
low = int(arg1)
high = int(arg2)
elif arg1 is not None:
low = 0
high = int(arg1)
else:
low = 0
high = sys.maxsize
except (ValueError, TypeError):
return bot.reply("Arguments must be of integer type")
if low > high:
low, high = high, low
number = random.randint(low, high)
bot.reply(f"random({low}, {high}) = {number}")

75
modules/reload.py Executable file
View File

@ -0,0 +1,75 @@
#!/usr/bin/env python3
"""
Loads, reloads and unloads modules on the fly.
"""
import sys
import loader
import module
@module.require_admin
@module.commands("reload")
@module.thread(False)
def f_reload(bot, trigger):
"""Reloads a module, for use by admins only."""
name = trigger.group(2)
if not name or name == "*" or name.upper() == "ALL THE THINGS":
bot.load_modules()
return bot.msg("done")
if name not in sys.modules:
name = "modules." + name
if name not in sys.modules:
return bot.msg(f"Module '{name}' not loaded, try the 'load' command.")
loader.unload_module(bot, name)
loader.load_module(bot, name)
bot.msg(f"Module '{name}' reloaded.")
@module.require_admin
@module.commands("load")
@module.thread(False)
def f_load(bot, trigger):
"""Loads a module, for use by admins only."""
name = trigger.group(2)
if not name:
return bot.msg('Load what?')
if name in sys.modules:
return bot.msg('Module already loaded, use reload.')
try:
loader.load_module(bot, name)
except ModuleNotFoundError:
if name.startswith("modules."):
return bot.msg(f"Module not found: '{name}'")
name = "modules." + name
try:
loader.load_module(bot, name)
except ModuleNotFoundError:
return bot.msg(f"Module not found: '{name}'")
bot.msg(f"Module '{name}' loaded.")
@module.require_admin
@module.commands("unload")
@module.thread(False)
def f_unload(bot, trigger):
"""Unloads a module, for use by admins only."""
name = trigger.group(2)
if not name:
return bot.msg('Unload what?')
if name not in sys.modules:
name = "modules." + name
if name not in sys.modules:
return bot.msg(f"Module '{name}' not loaded, try the 'load' command.")
loader.unload_module(bot, name)
bot.msg(f"Module '{name}' unloaded.")

260
modules/remind.py Executable file
View File

@ -0,0 +1,260 @@
#!/usr/bin/env python3
"""
Reminds of things.
"""
import os
import re
import time
import sqlite3
import datetime
import threading
import collections
from module import commands, example
class MonitorThread(threading.Thread):
"""
A custom custom thread class for monitoring the time to announce
reminders. It allows itself to be stopped when there are no reminders
to look for.
"""
def __init__(self, bot):
threading.Thread.__init__(self)
self._bot = bot
self.stop = threading.Event()
def run(self):
# while not self._bot.channels.keys():
while not self._bot.stillConnected():
time.sleep(1)
# don't try to say anything if we're not fully connected yet
while not self.stop.is_set():
now = int(time.time())
unixtimes = [int(key) for key in self._bot.memory["remind"].keys()]
oldtimes = [t for t in unixtimes if t <= now]
if oldtimes:
for oldtime in oldtimes:
for reminder in self._bot.memory["remind"][oldtime]:
channel, nick, message = reminder
if message:
self._bot.msg(channel, nick + ': ' + message)
else:
self._bot.msg(channel, nick + '!')
del self._bot.memory["remind"][oldtime]
delete_reminder(self._bot, oldtime)
if not self._bot.memory["remind"] or not self._bot.stillConnected():
self.stop.set()
time.sleep(2.5)
del self._bot.memory["remind_monitor"]
def start_monitor(bot):
"""
Starts the monitor thread. Does nothing if one is already running.
"""
if bot.memory.get("remind_monitor"):
return
t = MonitorThread(bot)
t.start()
bot.memory["remind_monitor"] = t
def load_database(bot):
"""
Loads all entries from the 'remind' table in the bot's database and
stores them in memory
"""
data = {}
reminds = bot.db.execute("SELECT * FROM remind").fetchall()
for remind in reminds:
unixtime, channel, nick, message = remind
reminder = (channel, nick, message)
try:
data[unixtime].append(reminder)
except KeyError:
data[unixtime] = [reminder]
return data
def insert_reminder(bot, unixtime, reminder):
"""
Inserts a new reminder into the 'remind' table in the bot's database.
reminder - a tuple containing (channel, nick, message)
"""
bot.db.execute("INSERT INTO remind (unixtime, channel, nick, message) "
"VALUES(?,?,?,?)", (unixtime,) + reminder)
def delete_reminder(bot, unixtime):
"""
Deletes a reminder from the 'remind' table in the bot's database, using
unixtime as the key.
"""
bot.db.execute("DELETE FROM remind WHERE unixtime = ?", (unixtime,))
def setup(bot):
con = bot.db.connect()
cur = con.cursor()
try:
cur.execute("SELECT * FROM remind").fetchone()
except sqlite3.OperationalError:
cur.execute("CREATE TABLE remind("
"unixtime INTEGER DEFAULT (STRFTIME('%s', 'now')),"
"channel TEXT,"
"nick TEXT,"
"message TEXT)")
con.commit()
con.close()
bot.memory["remind"] = load_database(bot)
start_monitor(bot)
scaling = collections.OrderedDict([
('years', 365.25 * 24 * 3600),
('year', 365.25 * 24 * 3600),
('yrs', 365.25 * 24 * 3600),
('y', 365.25 * 24 * 3600),
('months', 29.53059 * 24 * 3600),
('month', 29.53059 * 24 * 3600),
('mo', 29.53059 * 24 * 3600),
('weeks', 7 * 24 * 3600),
('week', 7 * 24 * 3600),
('wks', 7 * 24 * 3600),
('wk', 7 * 24 * 3600),
('w', 7 * 24 * 3600),
('days', 24 * 3600),
('day', 24 * 3600),
('d', 24 * 3600),
('hours', 3600),
('hour', 3600),
('hrs', 3600),
('hr', 3600),
('h', 3600),
('minutes', 60),
('minute', 60),
('mins', 60),
('min', 60),
('m', 60),
('seconds', 1),
('second', 1),
('secs', 1),
('sec', 1),
('s', 1),
])
periods = '|'.join(scaling.keys())
@commands('remind')
@example('.remind 3h45m Go to class')
def remind(bot, trigger):
"""Gives you a reminder in the given amount of time."""
if not trigger.group(2):
return bot.say("Missing arguments for reminder command.")
if trigger.group(3) and not trigger.group(4):
return bot.say("No message given for reminder.")
duration = 0
message = filter(None, re.split(f"(\d+(?:\.\d+)? ?(?:(?i) {periods})) ?",
trigger.group(2))[1:])
reminder = ''
stop = False
for piece in message:
grp = re.match('(\d+(?:\.\d+)?) ?(.*) ?', piece)
if grp and not stop:
length = float(grp.group(1))
factor = scaling.get(grp.group(2).lower(), 60)
duration += length * factor
else:
reminder = reminder + piece
stop = True
if duration == 0:
return bot.reply("Sorry, didn't understand the input.")
if duration % 1:
duration = int(duration) + 1
else:
duration = int(duration)
create_reminder(bot, trigger, duration, reminder)
@commands('at')
@example('.at 13:47 Do your homework!')
@example('.at 16:30UTC-5 Do cocaine')
@example('.at 14:45:45 Remove dick from oven')
def at(bot, trigger):
"""
Gives you a reminder at the given time. Takes hh:mm:ssUTC+/-##
message. Timezone, if provided, must be in UTC format. 24 hour
clock format only.
"""
if not trigger.group(2):
return bot.say("No arguments given for reminder command.")
if trigger.group(3) and not trigger.group(4):
return bot.say("No message given for reminder.")
regex = re.compile(r"(\d+):(\d+)(?::(\d+))?(?:UTC([+-]\d+))? (.*)")
match = regex.match(trigger.group(2))
if not match:
return bot.reply("Sorry, but I didn't understand your input.")
hour, minute, second, tz, message = match.groups()
if not second:
second = '0'
if tz:
try:
tz = int(tz.replace("UTC", ""))
except ValueError:
bot.say("Invalid timezone. Using the bot's current timezone.")
tz = None
if tz:
timezone = datetime.timezone(datetime.timedelta(hours=tz))
else:
timezone = datetime.datetime.now().astimezone().tzinfo
# current timezone the bot is in
now = datetime.datetime.now(timezone)
at_time = datetime.datetime(now.year, now.month, now.day,
int(hour), int(minute), int(second),
tzinfo=timezone)
timediff = at_time - now
duration = timediff.seconds
if duration < 0:
duration += 86400
create_reminder(bot, trigger, duration, message)
def create_reminder(bot, trigger, duration, message):
"""
Inserts the reminder into the bot's memory and database so it can
eventually announce for it.
"""
t = int(time.time()) + duration
reminder = (trigger.channel, trigger.nick, message)
try:
bot.memory["remind"][t].append(reminder)
except KeyError:
bot.memory["remind"][t] = [reminder]
start_monitor(bot)
insert_reminder(bot, t, reminder)
if duration >= 60:
remind_at = datetime.datetime.fromtimestamp(t)
t_format = bot.config.core.default_time_format
timef = datetime.datetime.strftime(remind_at, t_format)
bot.reply('Okay, will remind at %s' % timef)
else:
bot.reply('Okay, will remind in %s secs' % duration)

166
modules/resistor.py Executable file
View File

@ -0,0 +1,166 @@
#!/usr/bin/env python3
"""
Resistor color band codes.
"""
import re
import argparse
from module import commands, example
suffix = {"k": 1000, "m": 10000000}
sigfig = {"black": 0,
"brown": 1,
"red": 2,
"orange": 3,
"yellow": 4,
"green": 5,
"blue": 6,
"violet": 7,
"grey": 8,
"white": 9}
sigfig_inverse = {val: key for key, val in sigfig.items()}
multiplier = {"black": 1,
"brown": 10,
"red": 100,
"orange": 1000,
"yellow": 10000,
"green": 100000,
"blue": 1000000,
"violet": 10000000,
"grey": 100000000,
"white": 1000000000,
"gold": 0.1,
"silver": 0.01}
multiplier_inverse = {val: key for key, val in multiplier.items()}
tolerance = {"brown": "±1%",
"red": "±2%",
"green": "±0.5%",
"blue": "±0.25%",
"violet": "±0.1%",
"grey": "±0.05%",
"gold": "±5%",
"silver": "±10%",
"none": "±20%"}
temp_coeff = {"black": "250 ppm",
"brown": "100 ppm",
"red": "50 ppm",
"orange": "15 ppm",
"yellow": "25 ppm",
"blue": "10 ppm",
"violet": "5 ppm"}
@commands("resist")
@example(".resist 10k", "brown black orange gold")
def resist(bot, trigger):
"""
Displays the color band code of a resistor for the given resistance.
"""
if not trigger.group(2):
return bot.say("Please specify a value")
parser = argparse.ArgumentParser()
parser.add_argument("value", nargs="+")
parser.add_argument("-r", "--reverse", action="store_true")
parser.add_argument("-n", "--num_bands", type=int, choices=[3,4,5,6], default=4)
args = parser.parse_args(trigger.group(2).split())
if args.reverse: # bands-to-value
bot.say(bands_to_value(" ".join(args.value)))
else: # value-to-band
if len(args.value) > 1:
return bot.say("Too many values.")
value = args.value[0].lower()
mul = 1
if value[-1] in ["k", "m"]:
mul = suffix[value[-1]]
value = value[:-1]
try:
value = float(value) * mul
except ValueError:
return bot.say("Invalid input.")
return bot.say(value_to_band(value, args.num_bands))
def value_to_band(value, num_bands=4):
"""
Converts a given resistance value to a color band code.
"""
if value < 1:
return "Value too small. Maybe this will be fixed in the future."
else:
if num_bands > 4:
value = float(format(value, ".3g"))
else:
value = float(format(value, ".2g"))
value = re.sub("\.0$", "", str(value))
bands = []
mul = ""
if "." in value:
if value[-2] == ".":
mul = 0.1
elif value[-3] == ".":
mul = 0.01
else:
return "Error with sigfigs."
value = value.replace(".", "")
val1 = int(value[0])
val2 = int(value[1])
bands.append(sigfig_inverse[val1])
bands.append(sigfig_inverse[val2])
if num_bands > 4:
value = value.ljust(4,"0")
val3 = int(value[2])
bands.append(sigfig_inverse[val3])
if not mul:
mul = 10**(len(value) - len(value.rstrip("0")))
bands.append(multiplier_inverse[mul])
# TODO: better tolerance
bands.append("gold")
if num_bands == 3:
return " ".join(bands)
# TODO: better temp coeff
if num_bands == 6:
bands.append("red")
return " ".join(bands)
def bands_to_value(bands):
"""
Converts the given color band code into a resistance value.
"""
bands = bands.lower().split()
ret = []
if len(bands) > 4:
value = bands[:3]
bands = bands[3:]
else:
value = bands[:2]
bands = bands[2:]
value = [sigfig[v] for v in value]
prod = ""
for x in value:
prod += str(x)
prod = float(prod) * multiplier[bands[0]]
if len(bands) == 1:
return " ".join([str(prod), tolerance["none"]])
if len(bands) == 2:
return " ".join([str(prod), tolerance[bands[1]]])
if len(bands) == 3:
return " ".join([str(prod), tolerance[bands[1]], temp_coeff[bands[2]]])

22
modules/rundown.py Executable file
View File

@ -0,0 +1,22 @@
#!/usr/bin/env python3
"""
Redpill on the Bogdanovs.
"""
import os
import random
from module import commands, example
@commands("rundown")
@example(".rundown")
def grog(bot, trigger):
"""
Provides rundown on demand.
"""
if trigger.group(2) in ["-c", "--cabal"]:
with open(os.path.join(bot.static, "cabaldown.txt"), "r") as file:
data = file.read()
else:
with open(os.path.join(bot.static, "rundown.txt"), "r") as file:
data = file.read()
bot.say(data)

79
modules/scramble.py Executable file
View File

@ -0,0 +1,79 @@
#!/usr/bin/env python3
"""
Scramble.
"""
import random
import module
class Scramble():
def __init__(self):
self.running = False
def newgame(self):
self.running = True
self.word = self._PickWord()
self.shuffled = [x for x in self.word]
random.shuffle(self.shuffled)
def _PickWord(self):
with open("/home/iou1name/.sopel/words6.txt",'r') as file:
lines = file.readlines()
wrd = list(lines[ random.randint(0, len(lines))-1 ].strip())
return wrd
def gameover(self):
self.running = False
self.shuffled = self.word
def isAnagram(givenWord, givenGuess):
word = [x for x in givenWord]
guess = [x for x in givenGuess]
with open('/home/iou1name/.sopel/words6.txt', 'r') as file:
words = file.readlines()
if not ''.join(guess)+'\n' in words:
return "notaword"
del words
for char in word:
if char in guess:
guess.pop(guess.index(char))
else:
return 'incorrect'
return 'correct'
scramble = Scramble()
@module.commands('scramble')
@module.example('.scramble')
def scramble_start(bot, trigger):
"""Starts a game of scramble."""
if scramble.running:
bot.reply("There is already a game running.")
return
scramble.newgame()
bot.say(trigger.nick + " has started a game of scramble! Type .sc to guess the solution.")
bot.say(''.join(scramble.shuffled))
@module.commands('sc')
@module.example('.sc anus')
def guess(bot, trigger):
"""Makes a guess in scramble."""
if not scramble.running:
bot.reply('There is no game currently running. Use .scramble to start one')
return
response = isAnagram(scramble.word, list(trigger.group(2)))
if response == 'correct':
bot.say(trigger.nick + " has won!")
scramble.gameover()
elif response == 'notaword':
bot.say("I don't recognize that word.")
elif response == 'incorrect':
bot.reply("incorrect.")
bot.say(''.join(scramble.shuffled))

142
modules/sed.py Executable file
View File

@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
Fulvia Spelling correction module
This module will fix spelling errors if someone corrects them
using the sed notation (s///) commonly found in vi/vim.
"""
import re
from module import hook
from tools import FulviaMemory
def setup(bot):
bot.memory['find_lines'] = FulviaMemory()
@hook(True)
def collectlines(bot, trigger):
"""Create a temporary log of what people say"""
# Don't log things in PM
if trigger.is_privmsg:
return
# Add a log for the channel and nick, if there isn't already one
if trigger.channel not in bot.memory['find_lines']:
bot.memory['find_lines'][trigger.channel] = FulviaMemory()
if trigger.nick not in bot.memory['find_lines'][trigger.channel]:
bot.memory['find_lines'][trigger.channel][trigger.nick] = list()
# Create a temporary list of the user's lines in a channel
templist = bot.memory['find_lines'][trigger.channel][trigger.nick]
line = trigger.group()
if line.startswith("s/"): # Don't remember substitutions
return
elif line.startswith("\x01ACTION"): # For /me messages
line = line[:-1]
templist.append(line)
else:
templist.append(line)
del templist[:-10] # Keep the log to 10 lines per person
bot.memory['find_lines'][trigger.channel][trigger.nick] = templist
#Match nick, s/find/replace/flags. Flags and nick are optional, nick can be
#followed by comma or colon, anything after the first space after the third
#slash is ignored, you can escape slashes with backslashes, and if you want to
#search for an actual backslash followed by an actual slash, you're shit out of
#luck because this is the fucking regex of death as it is.
# @rule(r"""(?:
# (\S+) # Catch a nick in group 1
# [:,]\s+)? # Followed by colon/comma and whitespace, if given
# s/ # The literal s/
# ( # Group 2 is the thing to find
# (?:\\/ | [^/])+ # One or more non-slashes or escaped slashes
# )/( # Group 3 is what to replace with
# (?:\\/ | [^/])* # One or more non-slashes or escaped slashes
# )
# (?:/(\S+))? # Optional slash, followed by group 4 (flags)
# """)
@hook(True)
def findandreplace(bot, trigger):
# Don't bother in PM
if trigger.is_privmsg:
return
rule = re.compile(r"(?:(\S+)[:,]\s+)?s\/((?:\\\/|[^/])+)\/((?:\\\/|[^/])*)"\
+ r"(?:\/(\S+))?")
group = rule.search(trigger.group(0))
if not group:
return
g = (trigger.group(0),) + group.groups()
trigger.set_group(g, bot.config)
# Correcting other person vs self.
rnick = (trigger.group(1) or trigger.nick)
search_dict = bot.memory['find_lines']
# only do something if there is conversation to work with
if trigger.channel not in search_dict:
return
if rnick not in search_dict[trigger.channel]:
return
#TODO rest[0] is find, rest[1] is replace. These should be made variables of
#their own at some point.
rest = [trigger.group(2), trigger.group(3)]
rest[0] = rest[0].replace(r'\/', '/')
rest[1] = rest[1].replace(r'\/', '/')
me = False # /me command
flags = (trigger.group(4) or '')
print(flags)
# If g flag is given, replace all. Otherwise, replace once.
if 'g' in flags:
count = 0
else:
count = 1
# repl is a lambda function which performs the substitution. i flag turns
# off case sensitivity. re.U turns on unicode replacement.
if 'i' in flags:
regex = re.compile(re.escape(rest[0]), re.U | re.I)
repl = lambda s: re.sub(regex, rest[1], s, count == 1)
else:
repl = lambda s: re.sub(rest[0], rest[1], s, count)
# Look back through the user's lines in the channel until you find a line
# where the replacement works
new_phrase = None
for line in reversed(search_dict[trigger.channel][rnick]):
if line.startswith("\x01ACTION"):
me = True # /me command
line = line[8:]
else:
me = False
new_phrase = repl(line)
if new_phrase != line: # we are done
break
if not new_phrase or new_phrase == line:
return # Didn't find anything
# Save the new "edited" message.
action = (me and '\x01ACTION ') or '' # If /me message, prepend \x01ACTION
templist = search_dict[trigger.channel][rnick]
templist.append(action + new_phrase)
search_dict[trigger.channel][rnick] = templist
bot.memory['find_lines'] = search_dict
# output
if not me:
new_phrase = f"\x02meant\x0f to say: {new_phrase}"
if trigger.group(1):
phrase = f"{trigger.nick} thinks {rnick} {new_phrase}"
else:
phrase = f"{trigger.nick} {new_phrase}"
bot.say(phrase)

106
modules/seen.py Executable file
View File

@ -0,0 +1,106 @@
#!/usr/bin/env python3
"""
When was this user last seen.
"""
import time
import threading
from datetime import datetime
from sqlite3 import OperationalError
from tools.time import relativeTime
from module import commands, example, hook, require_chanmsg
def load_database(bot):
"""
Loads all entries from the 'seen' table in the bot's database and
returns them.
"""
data = {}
seens = bot.db.execute("SELECT * FROM seen").fetchall()
for seen in seens:
nick, timestamp, channel, message = seen
seen = (timestamp, channel, message)
data[nick] = seen
return data
def setup(bot):
bot.memory["seen_lock"] = threading.Lock()
bot.memory["seen"] = load_database(bot)
bot.memory["seen_last_dump"] = time.time()
con = bot.db.connect()
cur = con.cursor()
try:
cur.execute("SELECT * FROM seen").fetchone()
except OperationalError:
cur.execute("CREATE TABLE seen("
"nick TEXT PRIMARY KEY,"
"timestamp INTEGER,"
"channel TEXT,"
"message TEXT)")
con.commit()
con.close()
@commands('seen')
@example(".seen Nigger -l", "Last heard from Nigger at [1997-03-12 16:30:00] "\
+"with \"Just going to the store for some smokes babe I'll be right back\"")
@example(".seen Soma_QM", "I haven't seen Soma_QM")
def seen(bot, trigger):
"""Reports when and where the user was last seen."""
nick = trigger.group(3)
last = False
if nick == "-l" or nick == "--last":
last = True
nick = trigger.group(4)
if not nick:
return bot.say("Seen who?")
if nick == bot.nick:
return bot.reply("I'm right here!")
if nick in bot.memory["seen"]:
timestamp, channel, message = bot.memory["seen"][nick]
else:
return bot.msg(f"I haven't seen \x0308{nick}")
timestamp = datetime.fromtimestamp(timestamp)
t_format = bot.config.core.default_time_format
timestamp = datetime.strftime(timestamp, t_format)
reltime = relativeTime(bot.config, datetime.now(), timestamp)
msg = f"Last heard from \x0308{nick}\x03 at {timestamp} " \
+ f"(\x0312{reltime} ago\x03) in \x0312{channel}"
if last:
msg += f'\x03 with "\x0308{message}\x03"'
bot.say(msg)
def dump_seen_db(bot):
"""
Dumps the seen database into the bot's database.
"""
bot.memory["seen_lock"].acquire()
for nick, seen in bot.memory["seen"].items():
bot.db.execute("INSERT OR REPLACE INTO seen "
"(nick, timestamp, channel, message) VALUES (?, ?, ?, ?)",
(nick,) + seen)
bot.memory["seen_lock"].release()
@hook(True)
@require_chanmsg
def seen_hook(bot, trigger):
seen = (time.time(), trigger.channel, trigger.group(0))
bot.memory["seen"][trigger.nick] = seen
if time.time() - bot.memory["seen_last_dump"] > 60:
# only dump once a minute at most
dump_seen_db(bot)
bot.memory["seen_last_dump"] = time.time()

32
modules/spellcheck.py Executable file
View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
"""
Spell checking. Relies on the pyenchant module.
"""
import enchant
from module import commands, example
@commands('spellcheck', 'spell')
@example('.spellcheck stuff')
def spellcheck(bot, trigger):
"""
Says whether the given word is spelled correctly, and gives suggestions if
it's not.
"""
if not trigger.group(2):
return bot.reply("What word?")
word = trigger.group(2)
if " " in word:
return bot.say("One word at a time, please")
dictionary = enchant.Dict("en_US")
if dictionary.check(word):
bot.say(word + " is spelled correctly")
else:
msg = f"{word} is not spelled correctly. Maybe you want one of " \
+ "these spellings: "
sugWords = []
for suggested_word in dictionary.suggest(word):
sugWords.append(suggested_word)
msg += ", ".join(sugWords)
bot.msg(msg)

121
modules/tell.py Executable file
View File

@ -0,0 +1,121 @@
#!/usr/bin/env python3
"""
Leave a message for someone.
"""
import os
import sys
import time
import threading
from datetime import datetime
from sqlite3 import OperationalError
from tools.time import relativeTime
from module import commands, example, hook
def load_database(bot):
"""
Loads all entries from the 'tell' table in the bot's database and
stores them in memory
"""
data = {}
tells = bot.db.execute("SELECT * FROM tell").fetchall()
for tell in tells:
tellee, teller, unixtime, message = tell
tell = (teller, unixtime, message)
try:
data[tellee].append(tell)
except KeyError:
data[tellee] = [tell]
return data
def insert_tell(bot, tellee, teller, unixtime, message):
"""
Inserts a new tell into the 'tell' table in the bot's database.
"""
bot.db.execute("INSERT INTO tell (tellee, teller, unixtime, message) "
"VALUES(?,?,?,?)", (tellee, teller, unixtime, message))
def delete_tell(bot, tellee):
"""
Deletes a tell from the 'tell' table in the bot's database, using
tellee as the key.
"""
bot.db.execute("DELETE FROM tell WHERE tellee = ?", (tellee,))
def setup(bot):
con = bot.db.connect()
cur = con.cursor()
try:
cur.execute("SELECT * FROM tell").fetchone()
except OperationalError:
cur.execute("CREATE TABLE tell("
"tellee TEXT, "
"teller TEXT,"
"unixtime INTEGER,"
"message TEXT)")
con.commit()
con.close()
bot.memory["tell"] = load_database(bot)
@commands('tell')
@example('.tell iou1name you broke something again.')
def tell(bot, trigger):
"""Give someone a message the next time they're seen"""
if not trigger.group(3):
return bot.reply("Tell whom?")
teller = trigger.nick
tellee = trigger.group(3).rstrip('.,:;')
message = trigger.group(2).replace(tellee, "", 1).strip()
if not message:
return bot.reply(f"Tell {tellee} what?")
if tellee == bot.nick:
return bot.reply("I'm here now, you can tell me whatever you want!")
if tellee == teller or tellee == "me":
return bot.reply("You can tell yourself that.")
unixtime = time.time()
if not tellee in bot.memory['tell']:
bot.memory['tell'][tellee] = [(teller, unixtime, message)]
else:
bot.memory['tell'][tellee].append((teller, unixtime, message))
insert_tell(bot, tellee, teller, unixtime, message)
response = f"I'll pass that on when {tellee} is around."
bot.reply(response)
@hook(True)
def tell_hook(bot, trigger):
"""
Hooks every line to see if a tellee has said something. If they have,
it gives them all of their tells.
"""
if not trigger.nick in bot.memory["tell"]:
return
tellee = trigger.nick
tells = []
for tell in bot.memory["tell"][tellee]:
teller, unixtime, message = tell
telldate = datetime.fromtimestamp(unixtime)
reltime = relativeTime(bot.config, datetime.now(), telldate)
t_format = bot.config.core.default_time_format
telldate = datetime.strftime(telldate, t_format)
msg = f"{tellee}: \x0310{message}\x03 (\x0308{teller}\x03) {telldate}" \
+ f" [\x0312{reltime} ago\x03]"
tells.append(msg)
for tell in tells:
bot.msg(tell)
bot.memory["tell"].pop(tellee)
delete_tell(bot, tellee)

51
modules/tld.py Executable file
View File

@ -0,0 +1,51 @@
#!/usr/bin/env python3
"""
Scrapes and shows information about Top Level Domains.
"""
import bs4
import requests
from module import commands, example
URI = 'https://en.wikipedia.org/wiki/List_of_Internet_top-level_domains'
@commands('tld')
@example('.tld me')
def gettld(bot, trigger):
"""Show information about the given Top Level Domain."""
word = trigger.group(2).strip()
if " " in word:
return bot.reply("One TLD at a time.")
if not word.startswith("."):
word = "." + word
res = requests.get(URI, verify=True)
res.raise_for_status()
soup = bs4.BeautifulSoup(res.text, "html.parser")
td = soup.find("td", string=word)
if not td:
return bot.say(f"Unable to find data for TLD: {word}")
table_headers = [th.string for th in td.parent.parent.find_all("th")]
if None in table_headers:
n = table_headers.index(None)
table_headers[n] = "Administrator"
# this should be the only header on the page where th.string fails
table_entries = []
for td in td.parent.find_all("td"):
string = td.text.strip()
if not string:
try:
string = td.a.string
except AttributeError:
string = ""
table_entries.append(string)
msg = "[\x0304TLD\x03] "
for n in range(len(table_headers)):
msg += f"\x0310{table_headers[n]}\x03: "
msg += f"\x0312{table_entries[n]}\x03 | "
msg = msg[:-3]
bot.msg(msg)

62
modules/topic.py Executable file
View File

@ -0,0 +1,62 @@
#!/usr/bin/env python3
"""
This module allows you to add topics to a list in the database and cycle
through them.
"""
import os
import threading
import random
from sqlite3 import IntegrityError, OperationalError
from module import commands, example
def setup(bot):
"""
Attempts to create the table in the database if it's not found.
"""
bot.memory['topic_lock'] = threading.Lock()
con = bot.db.connect()
cur = con.cursor()
try:
cur.execute("SELECT * FROM topic").fetchone()
except OperationalError:
cur.execute("CREATE TABLE topic("
"topic TEXT PRIMARY KEY,"
"added_by TEXT,"
"added_date INTEGER DEFAULT (STRFTIME('%s', 'now')))")
con.commit()
con.close()
@commands('topic')
def topic(bot, trigger):
"""
Picks a random topic from the database and applies it.
"""
topic = bot.db.execute("SELECT topic FROM topic " \
"ORDER BY RANDOM() LIMIT 1;").fetchone()
if not topic:
return bot.reply("Topic database is empty!")
else:
topic = topic[0]
bot.topic(trigger.channel, topic)
@commands('addtopic')
@example('.addtopic Daily reminder to kill all cia niggers on site.')
def addTopic(bot, trigger):
"""
Adds the specified topic to the topic database.
"""
topic = trigger.group(2)
if not topic:
return bot.say("Please be providing a topic sir.")
bot.memory['topic_lock'].acquire()
try:
bot.db.execute("INSERT INTO topic (topic, added_by) VALUES(?,?)",
(topic, trigger.nick))
confirm = "Added topic: " + topic
except IntegrityError:
confirm = "Error: " + topic + " is already in the database."
bot.memory['topic_lock'].release()
bot.say(confirm)

73
modules/translate.py Executable file
View File

@ -0,0 +1,73 @@
#!/usr/bin/env python3
"""
Google translate that shit.
"""
import random
import argparse
import requests
from module import commands, example
def translate(text, in_lang='auto', out_lang='en'):
"""
Queries Google Translate.
"""
headers = {"User-Agent": "do you no de wae?"}
query = {
"client": "gtx",
"sl": in_lang,
"tl": out_lang,
"dt": "t",
"q": text,
}
url = "http://translate.googleapis.com/translate_a/single"
res = requests.get(url, params=query, timeout=40, headers=headers,
verify=True)
res.raise_for_status()
data = res.json()
return data[0][0][0], data[2]
@commands('translate', 'tr')
@example('.tr :en :fr my dog', '"mon chien" (en to fr, translate.google.com)')
@example('.tr היי', '"Hey" (iw to en, translate.google.com)')
@example('.tr mon chien', '"my dog" (fr to en, translate.google.com)')
def tr2(bot, trigger):
"""Translates a phrase, with an optional language hint."""
if not trigger.group(2):
return bot.reply("Translate what?")
parser = argparse.ArgumentParser()
parser.add_argument("text", nargs=argparse.REMAINDER)
parser.add_argument("-i", "--inlang", default="auto")
parser.add_argument("-o", "--outlang", default="en")
args = parser.parse_args(trigger.group(2).split())
args.text = " ".join(args.text)
tr_text, in_lang = translate(args.text, in_lang=args.inlang,
out_lang=args.outlang)
bot.say(f'"{tr_text}" ({in_lang} to {args.outlang})')
@commands('mangle')
def mangle(bot, trigger):
"""Repeatedly translate the input until it makes absolutely no sense."""
if not trigger.group(2):
return bot.reply("Mangle what?")
tr_text = trigger.group(2)
long_lang_list = ['fr', 'de', 'es', 'it', 'no', 'he', 'la', 'ja', 'cy',
'ar', 'yi', 'zh', 'nl', 'ru', 'fi', 'hi', 'af', 'jw', 'mr', 'ceb',
'cs', 'ga', 'sv', 'eo', 'el', 'ms', 'lv']
lang_list = []
for __ in range(0, 8):
lang_list.append(random.choice(long_lang_list))
for lang in lang_list:
tr_text, _ = translate(tr_text, "auto", lang)
tr_text, _ = translate(tr_text, "auto", "en")
bot.msg(tr_text)

41
modules/unicode_info.py Executable file
View File

@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""
Displays information about unicode endpoints.
"""
import unicodedata
from module import commands, example
@commands('u')
@example('.u ‽', 'U+203D INTERROBANG (‽)')
@example('.u 203D', 'U+203D INTERROBANG (‽)')
def codepoint(bot, trigger):
"""Looks up unicode information."""
arg = trigger.group(2)
if not arg:
return bot.reply('What code point do you want me to look up?')
stripped = arg.strip()
if len(stripped) > 0:
arg = stripped
if len(arg) > 1:
if arg.startswith('U+'):
arg = arg[2:]
try:
arg = chr(int(arg, 16))
except:
return bot.reply("That's not a valid code point.")
# Get the hex value for the code point, and drop the 0x from the front
point = str(hex(ord(u'' + arg)))[2:]
# Make the hex 4 characters long with preceding 0s, and all upper case
point = point.rjust(4, str('0')).upper()
try:
name = unicodedata.name(arg)
except ValueError:
return 'U+%s (No name found)' % point
if not unicodedata.combining(arg):
template = 'U+%s %s (%s)'
else:
template = 'U+%s %s (\xe2\x97\x8c%s)'
bot.say(template % (point, name, arg))

185
modules/units.py Executable file
View File

@ -0,0 +1,185 @@
#!/usr/bin/env python3
"""
Unit version.
"""
import re
from module import commands, example
find_temp = re.compile('(-?[0-9]*\.?[0-9]*)[ °]*(K|C|F)', re.IGNORECASE)
length = r"([0-9]*\.?[0-9]*)[ ]*(mile[s]?|mi|inch|in|foot|feet|ft|yard[s]?|yd|"\
+ r"(?:milli|centi|kilo|)meter[s]?|[mkc]?m|ly|light-year[s]?|au|astronomic"\
+ r"al unit[s]?|parsec[s]?|pc)"
find_length = re.compile(length, re.IGNORECASE)
mass = r"([0-9]*\.?[0-9]*)[ ]*(lb|lbm|pound[s]?|ounce|oz|(?:kilo|)gram(?:me|)["\
+ r"s]?|[k]?g)"
find_mass = re.compile(mass, re.IGNORECASE)
def f_to_c(temp):
return (float(temp) - 32) * 5 / 9
def c_to_k(temp):
return temp + 273.15
def c_to_f(temp):
return (9.0 / 5.0 * temp + 32)
def k_to_c(temp):
return temp - 273.15
@commands('temp')
@example('.temp 100F', '37.78°C = 100.00°F = 310.93K')
@example('.temp 100C', '100.00°C = 212.00°F = 373.15K')
@example('.temp 100K', '-173.15°C = -279.67°F = 100.00K')
def temperature(bot, trigger):
"""
Convert temperatures
"""
try:
source = find_temp.match(trigger.group(2)).groups()
except (AttributeError, TypeError):
return bot.reply("That's not a valid temperature.")
unit = source[1].upper()
numeric = float(source[0])
celsius = 0
if unit == 'C':
celsius = numeric
elif unit == 'F':
celsius = f_to_c(numeric)
elif unit == 'K':
celsius = k_to_c(numeric)
kelvin = c_to_k(celsius)
fahrenheit = c_to_f(celsius)
bot.reply("{:.2f}°C = {:.2f}°F = {:.2f}K".format(celsius,fahrenheit,kelvin))
@commands('length', 'distance')
@example('.distance 3m', '3.00m = 9 feet, 10.11 inches')
@example('.distance 3km', '3.00km = 1.86 miles')
@example('.distance 3 miles', '4.83km = 3.00 miles')
@example('.distance 3 inch', '7.62cm = 3.00 inches')
@example('.distance 3 feet', '91.44cm = 3 feet, 0.00 inches')
@example('.distance 3 yards', '2.74m = 9 feet, 0.00 inches')
@example('.distance 155cm', '1.55m = 5 feet, 1.02 inches')
@example('.length 3 ly', '28382191417742.40km = 17635876112814.77 miles')
@example('.length 3 au', '448793612.10km = 278867421.71 miles')
@example('.length 3 parsec', '92570329129020.20km = 57520535754731.61 miles')
def distance(bot, trigger):
"""
Convert distances
"""
try:
source = find_length.match(trigger.group(2)).groups()
except (AttributeError, TypeError):
return bot.reply("That's not a valid length unit.")
unit = source[1].lower()
numeric = float(source[0])
meter = 0
if unit in ("meters", "meter", "m"):
meter = numeric
elif unit in ("millimeters", "millimeter", "mm"):
meter = numeric / 1000
elif unit in ("kilometers", "kilometer", "km"):
meter = numeric * 1000
elif unit in ("miles", "mile", "mi"):
meter = numeric / 0.00062137
elif unit in ("inch", "in"):
meter = numeric / 39.370
elif unit in ("centimeters", "centimeter", "cm"):
meter = numeric / 100
elif unit in ("feet", "foot", "ft"):
meter = numeric / 3.2808
elif unit in ("yards", "yard", "yd"):
meter = numeric / (3.2808 / 3)
elif unit in ("light-year", "light-years", "ly"):
meter = numeric * 9460730472580800
elif unit in ("astronomical unit", "astronomical units", "au"):
meter = numeric * 149597870700
elif unit in ("parsec", "parsecs", "pc"):
meter = numeric * 30856776376340068
if meter >= 1000:
metric_part = '{:.2f}km'.format(meter / 1000)
elif meter < 0.01:
metric_part = '{:.2f}mm'.format(meter * 1000)
elif meter < 1:
metric_part = '{:.2f}cm'.format(meter * 100)
else:
metric_part = '{:.2f}m'.format(meter)
# Shit like this makes me hate being an American.
inch = meter * 39.37
foot = int(inch) // 12
inch = inch - (foot * 12)
yard = foot // 3
mile = meter * 0.000621371192
if yard > 500:
stupid_part = '{:.2f} miles'.format(mile)
else:
parts = []
if yard >= 100:
parts.append('{} yards'.format(yard))
foot -= (yard * 3)
if foot == 1:
parts.append('1 foot')
elif foot != 0:
parts.append('{:.0f} feet'.format(foot))
parts.append('{:.2f} inches'.format(inch))
stupid_part = ', '.join(parts)
bot.reply('{} = {}'.format(metric_part, stupid_part))
@commands('weight', 'mass')
@example(".weight 56 g", "56.00g = 1.98 oz")
@example(".mass 78lb", "35.38kg = 78 pounds")
@example("mass 72kg", "72.00kg = 158 pounds 11.73 ounces")
def mass(bot, trigger):
"""
Convert mass
"""
try:
source = find_mass.match(trigger.group(2)).groups()
except (AttributeError, TypeError):
return bot.reply("That's not a valid mass unit.")
unit = source[1].lower()
numeric = float(source[0])
metric = 0
if unit in ("gram", "grams", "gramme", "grammes", "g"):
metric = numeric
elif unit in ("kilogram", "kilograms", "kilogramme", "kilogrammes", "kg"):
metric = numeric * 1000
elif unit in ("lb", "lbm", "pound", "pounds"):
metric = numeric * 453.59237
elif unit in ("oz", "ounce"):
metric = numeric * 28.35
if metric >= 1000:
metric_part = '{:.2f}kg'.format(metric / 1000)
else:
metric_part = '{:.2f}g'.format(metric)
ounce = metric * .035274
pound = int(ounce) // 16
ounce = ounce - (pound * 16)
if pound > 1:
stupid_part = '{} pounds'.format(pound)
if ounce > 0.01:
stupid_part += ' {:.2f} ounces'.format(ounce)
else:
stupid_part = '{:.2f} oz'.format(ounce)
bot.reply('{} = {}'.format(metric_part, stupid_part))

20
modules/uptime.py Executable file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env python3
"""
How long the bot has been running.
"""
import datetime
from module import commands
def setup(bot):
if "uptime" not in bot.memory:
bot.memory["uptime"] = datetime.datetime.now()
@commands('uptime')
def uptime(bot, trigger):
""".uptime - Returns the uptime of Sopel."""
delta = datetime.timedelta(seconds=round((datetime.datetime.now() -
bot.memory["uptime"]).total_seconds()))
bot.say(f"I've been sitting here for {delta} and I keep going!")

53
modules/url.py Executable file
View File

@ -0,0 +1,53 @@
#!/usr/bin/env python3
"""
URL parsing.
"""
import re
from urllib.parse import urlparse
from html.parser import HTMLParser
import requests
from module import hook
HEADERS = {"User-Agent": "bix nood gimme the title", "Range": "bytes=0-4096"}
@hook(True)
def title_auto(bot, trigger):
"""
Automatically show titles for URLs. For shortened URLs/redirects, find
where the URL redirects to and show the title for that.
"""
if "http" not in trigger.group(0):
return
url_finder = re.compile(r"((?:http|https)(?::\/\/\S+))", re.IGNORECASE)
urls = re.findall(url_finder, trigger.group(0))
if len(urls) == 0:
return
for url in urls:
broken = False
for key in bot.url_callbacks:
if key in url:
bot.url_callbacks[key](bot, url)
broken = True
if broken:
continue
try:
res = requests.get(url, headers=HEADERS, verify=True)
except requests.exceptions.ConnectionError:
continue
try:
res.raise_for_status()
except:
continue
if not res.headers["Content-Type"].startswith("text/html"):
continue
if res.text.find("<title>") == -1:
continue
title = res.text[res.text.find("<title>")+7:res.text.find("</title>")]
title = HTMLParser().unescape(title)
title = title.replace("\n","").strip()
hostname = urlparse(url).hostname
bot.say(f"[ \x0310{title} \x03] - \x0304{hostname}")

10
modules/version.py Executable file
View File

@ -0,0 +1,10 @@
#!/usr/bin/env python3
"""
The bot's version number.
"""
from module import commands
@commands('version')
def version(bot, trigger):
"""Displays the current version of Fulvia running."""
bot.reply("Fulvia v1.0.0")

190
modules/watcher.py Executable file
View File

@ -0,0 +1,190 @@
#!/usr/bin/env python3
"""
A thread watcher module for 4chan.
"""
import time
import threading
import requests
from module import commands, example
def setup(bot):
"""
Establishes the bot's dictionary of watched threads.
"""
if not bot.memory.get("watcher"):
bot.memory["watcher"] = {}
con = bot.db.connect()
cur = con.cursor()
try:
watching = cur.execute("SELECT * FROM watcher").fetchall()
except:
cur.execute("CREATE TABLE watcher("
"api_url TEXT PRIMARY KEY,"
"name TEXT DEFAULT 'Anonymous',"
"last_post INTEGER,"
"time_since TEXT)")
con.commit()
else:
for thread in watching:
if get_thread_url(thread[0]) in bot.memory["watcher"].keys():
continue
t = WatcherThread(bot, thread[0], thread[1], thread[2], thread[3])
t.start()
bot.memory["watcher"][get_thread_url(thread[0])] = t
con.close()
def get_time():
"""
Returns the current time formatted in If-Modified-Since notation.
"""
return time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
def get_num_posts(thread, name):
"""
Gets the number of OP posts from the thread JSON.
"""
num = 0
for post in thread["posts"]:
if post.get("name") == name:
num += 1
return num
def get_last_post(thread, name):
"""
Gets the last post made by name.
"""
for post in reversed(thread["posts"]):
if post.get("name") == name:
return post.get("no")
def get_api_url(url):
"""
Returns the API url for the provided thread url.
"""
return url.replace("boards.4chan", "a.4cdn") + ".json"
def get_thread_url(api_url):
"""
Returns the normal thread url for the provided API url.
"""
return api_url.replace("a.4cdn", "boards.4chan").replace(".json", "")
@commands("watch")
@example(".watch https://boards.4chan.org/qst/thread/2")
def watch(bot, trigger):
"""
A thread watcher for 4chan.
"""
url = trigger.group(3)
name = trigger.group(4)
if not name:
name = "Anonymous"
if url in bot.memory["watcher"].keys():
return bot.say("Error: I'm already watching that thread.")
api_url = get_api_url(url)
res = requests.get(api_url, verify=True)
if res.status_code == 404:
return bot.say("404: thread not found")
thread = res.json()
last_post = get_last_post(thread, name)
time_since = get_time()
t = WatcherThread(bot, api_url, name, last_post, time_since)
t.start()
bot.memory["watcher"][url] = t
bot.db.execute("INSERT INTO watcher(api_url, name, last_post, time_since)"
" VALUES(?,?,?,?,?)", (api_url, name, last_post, time_since))
bot.say("[\x0304Watcher\x03] Watching thread: \x0307" + url)
@commands("unwatch")
@example(".unwatch https://boards.4chan.org/qst/thread/2")
def unwatch(bot, trigger):
"""
Stops the thread watcher thread for that thread.
"""
url = trigger.group(2)
try:
bot.memory["watcher"][url].stop.set()
bot.memory["watcher"].pop(url)
except KeyError:
return bot.say("Error: I'm not watching that thread.")
removeThread(bot, get_api_url(url),)
bot.say("[\x0304Watcher\x03] No longer watching: \x0307" + url)
def removeThread(bot, url):
"""
Removes the provided thread from the database. This should be the API url.
"""
bot.db.execute("DELETE FROM watcher WHERE api_url = ?", (url,))
class WatcherThread(threading.Thread):
def __init__(self, bot, api_url, name, last_post, time_since):
threading.Thread.__init__(self)
self.stop = threading.Event()
self.period = 20
self._bot = bot
self.api_url = api_url
self.name = name
self.last_post = last_post
self.time_since = time_since
def run(self):
while not self.stop.is_set():
self.stop.wait(self.period)
headers = {"If-Modified-Since": self.time_since}
try:
res = requests.get(self.api_url, headers=headers, verify=True)
self.time_since = get_time()
except urllib3.exceptions.NewConnectionError:
print(f"Watcher: Thread {self.api_url}: Connection error")
continue
if res.status_code == 404:
msg = "[\x0304Watcher\x03] Thread deleted: " \
+ f"\x0307{get_thread_url(self.api_url)}"
self._bot.msg(msg)
removeThread(self.bot, api_url)
self.stop.set()
continue
if res.status_code == 304:
continue
thread = res.json()
if thread["posts"][0].get("closed"):
msg = "[\x0304Watcher\x03] Thread closed: " \
+ f"\x0307{get_thread_url(self.api_url)}"
self._bot.msg(msg)
removeThread(self.bot, api_url)
self.stop.set()
continue
new_last_post = get_last_post(thread, self.name)
if new_last_post > self.last_post:
self.last_post = new_last_post
msg = "[\x0304Watcher\x03] New post from \x0308" \
+ f"{self.name}\x03: \x0307{get_thread_url(self.api_url)}"
+ f"#{self.last_post}"
self._bot.msg(msg)

144
modules/weather.py Executable file
View File

@ -0,0 +1,144 @@
#!/usr/bin/env python3
"""
The weather man.
"""
import requests
import xmltodict
from module import commands, example
URI = "https://query.yahooapis.com/v1/public/yql?{QUERY}"
def woeid_search(query):
"""
Find the first Where On Earth ID for the given query. Result is the etree
node for the result, so that location data can still be retrieved. Returns
None if there is no result, or the woeid field is empty.
"""
query = f'q=select * from geo.places where text="{query}"'
res = requests.get(URI.format(**{"QUERY": query}), verify=True)
body = res.content
parsed = xmltodict.parse(body).get('query')
results = parsed.get('results')
if results is None or results.get('place') is None:
return None
if type(results.get('place')) is list:
return results.get('place')[0]
return results.get('place')
def get_cover(parsed):
try:
condition = parsed['channel']['item']['yweather:condition']
except KeyError:
return 'unknown'
text = condition['@text']
# code = int(condition['code'])
# TODO parse code to get those little icon thingies.
return text
def get_temp(parsed):
try:
condition = parsed['channel']['item']['yweather:condition']
temp = int(condition['@temp'])
except (KeyError, ValueError):
return 'unknown'
f = round((temp * 1.8) + 32, 2)
return (u'%d\u00B0C (%d\u00B0F)' % (temp, f))
def get_humidity(parsed):
try:
humidity = parsed['channel']['yweather:atmosphere']['@humidity']
except (KeyError, ValueError):
return 'unknown'
return "Humidity: %s%%" % humidity
def get_wind(parsed):
try:
wind_data = parsed['channel']['yweather:wind']
kph = float(wind_data['@speed'])
m_s = float(round(kph / 3.6, 1))
speed = int(round(kph / 1.852, 0))
degrees = int(wind_data['@direction'])
except (KeyError, ValueError):
return 'unknown'
if speed < 1:
description = 'Calm'
elif speed < 4:
description = 'Light air'
elif speed < 7:
description = 'Light breeze'
elif speed < 11:
description = 'Gentle breeze'
elif speed < 16:
description = 'Moderate breeze'
elif speed < 22:
description = 'Fresh breeze'
elif speed < 28:
description = 'Strong breeze'
elif speed < 34:
description = 'Near gale'
elif speed < 41:
description = 'Gale'
elif speed < 48:
description = 'Strong gale'
elif speed < 56:
description = 'Storm'
elif speed < 64:
description = 'Violent storm'
else:
description = 'Hurricane'
if (degrees <= 22.5) or (degrees > 337.5):
degrees = u'\u2193'
elif (degrees > 22.5) and (degrees <= 67.5):
degrees = u'\u2199'
elif (degrees > 67.5) and (degrees <= 112.5):
degrees = u'\u2190'
elif (degrees > 112.5) and (degrees <= 157.5):
degrees = u'\u2196'
elif (degrees > 157.5) and (degrees <= 202.5):
degrees = u'\u2191'
elif (degrees > 202.5) and (degrees <= 247.5):
degrees = u'\u2197'
elif (degrees > 247.5) and (degrees <= 292.5):
degrees = u'\u2192'
elif (degrees > 292.5) and (degrees <= 337.5):
degrees = u'\u2198'
return description + ' ' + str(m_s) + 'm/s (' + degrees + ')'
@commands("weather")
@example('.weather London')
def weather(bot, trigger):
""".weather location - Show the weather at the given location."""
if not trigger.group(2):
return bot.reply("Weather where?")
location = trigger.group(2)
location = location.strip()
first_result = woeid_search(location)
woeid = None
if first_result is not None:
woeid = first_result.get('woeid')
if not woeid:
return bot.reply("I don't know where that is.")
query =f"q=select * from weather.forecast where woeid=\"{woeid}\" and u='c'"
res = requests.get(URI.format(**{"QUERY": query}), verify=True)
body = res.content
parsed = xmltodict.parse(body).get('query')
results = parsed.get('results')
if results is None:
return bot.reply("No forecast available. Try a more specific location.")
location = results.get('channel').get('title')
cover = get_cover(results)
temp = get_temp(results)
humidity = get_humidity(results)
wind = get_wind(results)
bot.say(u'%s: %s, %s, %s, %s' % (location, cover, temp, humidity, wind))

88
modules/wikipedia.py Executable file
View File

@ -0,0 +1,88 @@
#!/usr/bin/env python3
"""
Display excerpts from wikipedia.
"""
import requests
from module import commands, example, url_callback
def wiki_search(query, num):
"""
Searches en.wikipedia for the given query, and returns the specified
number of results.
"""
search_url = "http://en.wikipedia.org/w/api.php?format=json&action=query" \
+ f"&list=search&srlimit={num}&srprop=timestamp&srwhat=text" \
+ "&srsearch="
search_url += query
res = requests.get(search_url, verify=True)
res.raise_for_status()
data = res.json()
if 'query' in data:
data = data['query']['search']
return [r['title'] for r in data]
else:
return None
def say_snippet(bot, query, show_url=True):
page_name = query.replace('_', ' ')
query = query.replace(' ', '_')
snippet = wiki_snippet(query)
msg = f'[\x0304WIKIPEDIA\x03] \x0310{page_name}\x03 | \x0312"{snippet}"'
if show_url:
msg = msg + f"\x03 | \x0307https://en.wikipedia.org/wiki/{query}"
bot.say(msg)
def wiki_snippet(query):
"""
Retrives a snippet of the specified length from the given page on
en.wikipedia.
"""
snippet_url = "https://en.wikipedia.org/w/api.php?format=json" \
+ "&action=query&prop=extracts&exintro&explaintext" \
+ "&exchars=300&redirects&titles="
snippet_url += query
res = requests.get(snippet_url, verify=True)
res.raise_for_status()
snippet = res.json()
snippet = snippet['query']['pages']
# For some reason, the API gives the page *number* as the key, so we just
# grab the first page number in the results.
snippet = snippet[list(snippet.keys())[0]]
return snippet['extract']
@url_callback(".wikipedia.org/wiki/")
def wiki_info(bot, url):
"""
Retrives a snippet of the specified length from the given page on the given
server.
"""
_, _, query = url.partition("wiki/")
if not query:
return
say_snippet(bot, query, False)
@commands('wikipedia', 'wiki')
@example('.wiki San Francisco')
def wikipedia(bot, trigger):
"""Search wikipedia and return a snippet of the results."""
if trigger.group(2) is None:
return bot.reply("What do you want me to look up?")
query = trigger.group(2)
if not query:
return bot.reply("What do you want me to look up?")
data = wiki_search(query, 1)
if not data:
return bot.reply("I can't find any results for that.")
else:
data = data[0]
say_snippet(bot, data)

95
modules/wiktionary.py Executable file
View File

@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""
Pull definitions from Wikitionary.org.
"""
import re
import requests
from module import commands, example
uri = 'http://en.wiktionary.org/w/index.php?title={}&printable=yes'
r_tag = re.compile(r'<[^>]+>')
r_ul = re.compile(r'(?ims)<ul>.*?</ul>')
def text(html):
text = r_tag.sub('', html).strip()
text = text.replace('\n', ' ')
text = text.replace('\r', '')
text = text.replace('(intransitive', '(intr.')
text = text.replace('(transitive', '(trans.')
return text
def wikt(word):
res = requests.get(uri.format(word))
# bytes = r_ul.sub('', bytes)
res = res.text
mode = None
etymology = None
definitions = {}
for line in res.splitlines():
if 'id="Etymology"' in line:
mode = 'etymology'
elif 'id="Noun"' in line:
mode = 'noun'
elif 'id="Verb"' in line:
mode = 'verb'
elif 'id="Adjective"' in line:
mode = 'adjective'
elif 'id="Adverb"' in line:
mode = 'adverb'
elif 'id="Interjection"' in line:
mode = 'interjection'
elif 'id="Particle"' in line:
mode = 'particle'
elif 'id="Preposition"' in line:
mode = 'preposition'
elif 'id="' in line:
mode = None
elif (mode == 'etmyology') and ('<p>' in line):
etymology = text(line)
elif (mode is not None) and ('<li>' in line):
definitions.setdefault(mode, []).append(text(line))
if '<hr' in line:
break
return etymology, definitions
parts = ('preposition', 'particle', 'noun', 'verb',
'adjective', 'adverb', 'interjection')
def format(result, definitions, number=2):
for part in parts:
if part in definitions:
defs = definitions[part][:number]
result += u'{}: '.format(part)
n = ['%s. %s' % (i + 1, e.strip(' .')) for i, e in enumerate(defs)]
result += ', '.join(n)
return result.strip(' .,')
@commands('define', 'dict')
@example('.dict bailiwick')
def wiktionary(bot, trigger):
"""Look up a word on Wiktionary."""
word = trigger.group(2)
if word is None:
return bot.reply('You must tell me what to look up!')
_, definitions = wikt(word)
if not definitions:
return bot.say(f"Couldn't get any definitions for {word}.")
result = format(word, definitions)
if len(result) < 150:
result = format(word, definitions, 3)
if len(result) < 150:
result = format(word, definitions, 5)
if len(result) > 300:
result = result[:295] + '[...]'
bot.say(result)

18
modules/willilike.py Executable file
View File

@ -0,0 +1,18 @@
#!/usr/bin/env python3
"""
Will I like this?
"""
from module import commands, example
@commands('willilike')
@example('.willilike Banished Quest')
def willilike(bot, trigger):
"""An advanced AI that will determine if you like something."""
if trigger.group(2):
bot.reply("No.")
@commands('upvote')
@example('.willilike Banished Quest')
def upvote(bot, trigger):
"""An advanced AI that will determine if you like something."""
bot.say(trigger.nick + " upvoted this post!")

73
modules/wolfram.py Executable file
View File

@ -0,0 +1,73 @@
#!/usr/bin/env python3
"""
Querying Wolfram Alpha.
"""
import wolframalpha
from module import commands, example
@commands('wa', 'wolfram')
@example('.wa 2+2', '[W|A] 2+2 = 4')
@example(".wa python language release date",
"[W|A] Python | date introduced = 1991")
def wa_command(bot, trigger):
"""
Queries WolframAlpha.
"""
if not trigger.group(2):
return bot.reply("You must provide a query.")
if not bot.config.wolfram.app_id:
bot.reply("Wolfram|Alpha API app ID not configured.")
query = trigger.group(2).strip()
app_id = bot.config.wolfram.app_id
units = bot.config.wolfram.units
res = wa_query(query, app_id, units)
bot.say(f"[\x0304Wolfram\x03] {res}")
def wa_query(query, app_id, units='nonmetric'):
"""Queries Wolfram."""
if not app_id:
return "Wolfram|Alpha API app ID not provided."
client = wolframalpha.Client(app_id)
params = (
('format', 'plaintext'),
('units', units),
)
try:
res = client.query(input=query, params=params)
except AssertionError:
return "Temporary API issue. Try again in a moment."
except Exception as e:
print(e)
return "Error connecting to API. Please find an adult."
if int(res['@numpods']) == 0:
return "No results found."
texts = []
for pod in res.pods:
try:
texts.append(pod.text)
except AttributeError:
pass # pod with no text; skip it
except Exception as e:
print(e)
return "Weird result. Please find an adult."
if len(texts) >= 2:
break
# We only need two results (input and output)
try:
query, output = texts[0], texts[1]
except IndexError:
return "No text-representable result found."
output = output.replace('\n', ' | ')
if not output:
return query
return f"\x0310{query} \x03= \x0312{output}"

85
modules/xkcd.py Executable file
View File

@ -0,0 +1,85 @@
#!/usr/bin/env python3
"""
Search for and pull information about XKCD comics.
"""
import random
import requests
from module import commands, url_callback
def get_info(number=None):
if number:
url = f"https://xkcd.com/{number}/info.0.json"
else:
url = "https://xkcd.com/info.0.json"
res = requests.get(url, verify=True)
res.raise_for_status()
data = res.json()
data['url'] = f"https://xkcd.com/{data['num']}"
return data
def validate_num(num, max_int):
"""Ensures that the given number is a valid comic id."""
if num < 0: # nth latest
num += max_int
if num < 0:
num = 1
elif num == 0:
num = 1
elif num > max_int:
num = max_int
return num
def parse_data(data):
"""Parses the retrieved data into a proper message for the bot."""
msg = f"[\x0304XKCD\x03] \x0307{data['url']}\x03 | " \
+ f"\x0310{data['safe_title']}\x03 | " \
+ f"\x0310Alt-text: \x0312{data['alt']}"
return msg
@commands('xkcd')
def xkcd(bot, trigger):
"""
.xkcd - Finds an xkcd comic strip.
If no input is provided it will return a random comic
If numeric input is provided it will return that comic, or the nth-latest
comic if the number is non-positive
"""
latest = get_info()
max_int = latest['num']
if trigger.group(2):
try:
num = int(trigger.group(2))
except ValueError:
return bot.reply("Invalid input.")
num = validate_num(num, max_int)
data = get_info(num)
else:
num = random.randint(1, max_int)
data = get_info(num)
msg = parse_data(data)
bot.msg(msg)
@url_callback('xkcd.com/')
def get_url(bot, url):
"""Callback for the URL module."""
_, _, num = url.partition("xkcd.com/")
try:
num = int(num)
except ValueError:
return
latest = get_info()
max_int = latest['num']
num = validate_num(num, max_int)
data = get_info(num)
msg = parse_data(data)
bot.msg(msg)

32
run.py Executable file
View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
"""
Initializes the bot.
"""
import os
import sys
from twisted.internet import reactor
from bot import FulviaFactory
from config import Config
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Fulvia IRC Bot")
parser.add_argument(
"-c",
"--config",
help="Use a specific config file.")
args = parser.parse_args()
if not args.config:
args.config = "default.cfg"
config = Config(args.config)
server = config.core.server
port = int(config.core.port)
reactor.connectTCP(server, port, FulviaFactory(config))
reactor.run()

157
tools/__init__.py Executable file
View File

@ -0,0 +1,157 @@
#!/usr/bin/env python3
"""
Some helper functions and other tools.
"""
import re
import threading
op_level = {
"voice": 1,
"v": 1,
"+": 1,
"halfop": 2,
"h": 2,
"%": 2,
"op": 4,
"o": 4,
"@": 4,
"admin": 8,
"a": 8,
"&": 8,
"owner": 16,
"q": 16,
"~": 16
}
class FulviaMemory(dict):
"""
A simple thread-safe dict implementation.
In order to prevent exceptions when iterating over the values and changing
them at the same time from different threads, we use a blocking lock on
``__setitem__`` and ``__contains__``.
"""
def __init__(self, *args):
dict.__init__(self, *args)
self.lock = threading.Lock()
def __setitem__(self, key, value):
self.lock.acquire()
result = dict.__setitem__(self, key, value)
self.lock.release()
return result
def __contains__(self, key):
"""
Check if a key is in the dict.
It locks it for writes when doing so.
"""
self.lock.acquire()
result = dict.__contains__(self, key)
self.lock.release()
return result
class User(object):
"""A representation of a user Fulvia is aware of."""
def __init__(self, nick):
self.nick = nick
"""The user's nickname."""
self.ident = ""
self.user = self.ident
"""The user's local username/ident."""
self.host = ""
"""The user's hostname."""
self.channels = {}
"""The channels the user is in."""
self.away = None
"""Whether the user is marked as away."""
hostmask = property(lambda self: '{}!{}@{}'.format(self.nick, self.user,
self.host))
"""The user's full hostmask."""
class Channel(object):
"""A representation of a channel Fulvia is in."""
def __init__(self, name):
self.name = name
"""The name of the channel."""
self.type = ""
"""
The type of channel this is. Options are 'secret', 'private' or
'public' per RFC 2812.
"""
self.topic = ""
"""The topic of the channel."""
self.users = {}
"""The users in the channel. A set to ensure there are no duplicates."""
self.privileges = {}
"""The op levels of the users in the channel."""
self.modes = ""
"""The mode of the channel."""
# NOTE: this doesn't work yet
def remove_user(self, nick):
"""
Removes a user from the channel.
"""
user = self.users.pop(nick, None)
self.privileges.pop(nick, None)
if user != None:
user.channels.pop(self.name, None)
def add_user(self, user):
"""
Adds a user to the channel.
"""
assert isinstance(user, User)
self.users[user.nick] = user
self.privileges[user.nick] = 0
user.channels[self.name] = self
def rename_user(self, old, new):
"""
Renames the user.
"""
if old in self.users:
self.users[new] = self.users.pop(old)
if old in self.privileges:
self.privileges[new] = self.privileges.pop(old)
def configureHostMask(mask):
"""
Returns a valid hostmask based on user input.
"""
if mask == '*!*@*':
return mask
if re.match('^[^.@!/]+$', mask) is not None:
return '%s!*@*' % mask
if re.match('^[^@!]+$', mask) is not None:
return '*!*@%s' % mask
m = re.match('^([^!@]+)@$', mask)
if m is not None:
return '*!%s@*' % m.group(1)
m = re.match('^([^!@]+)@([^@!]+)$', mask)
if m is not None:
return '*!%s@%s' % (m.group(1), m.group(2))
m = re.match('^([^!@]+)!(^[!@]+)@?$', mask)
if m is not None:
return '%s!%s@*' % (m.group(1), m.group(2))
return ''

197
tools/calculation.py Executable file
View File

@ -0,0 +1,197 @@
#!/usr/bin/env python3
"""
Tools to help safely do calculations from user input.
"""
import time
import numbers
import operator
import ast
__all__ = ['eval_equation']
class ExpressionEvaluator:
"""
A generic class for evaluating limited forms of Python expressions.
Instances can overwrite binary_ops and unary_ops attributes with dicts of
the form {ast.Node, function}. When the ast.Node being used as key is
found, it will be evaluated using the given function.
"""
class Error(Exception):
pass
def __init__(self, bin_ops=None, unary_ops=None):
self.binary_ops = bin_ops or {}
self.unary_ops = unary_ops or {}
def __call__(self, expression_str, timeout=5.0):
"""
Evaluate a python expression and return the result.
Raises:
SyntaxError: If the given expression_str is not a valid python
statement.
ExpressionEvaluator.Error: If the instance of ExpressionEvaluator
does not have a handler for the ast.Node.
"""
ast_expression = ast.parse(expression_str, mode='eval')
return self._eval_node(ast_expression.body, time.time() + timeout)
def _eval_node(self, node, timeout):
"""
Recursively evaluate the given ast.Node.
Uses self.binary_ops and self.unary_ops for the implementation.
A subclass could overwrite this to handle more nodes, calling it only
for nodes it does not implement it self.
Raises:
ExpressionEvaluator.Error: If it can't handle the ast.Node.
"""
if isinstance(node, ast.Num):
return node.n
elif (isinstance(node, ast.BinOp) and
type(node.op) in self.binary_ops):
left = self._eval_node(node.left, timeout)
right = self._eval_node(node.right, timeout)
if time.time() > timeout:
raise ExpressionEvaluator.Error(
"Time for evaluating expression ran out.")
return self.binary_ops[type(node.op)](left, right)
elif (isinstance(node, ast.UnaryOp) and
type(node.op) in self.unary_ops):
operand = self._eval_node(node.operand, timeout)
if time.time() > timeout:
raise ExpressionEvaluator.Error(
"Time for evaluating expression ran out.")
return self.unary_ops[type(node.op)](operand)
raise ExpressionEvaluator.Error(
"Ast.Node '%s' not implemented." % (type(node).__name__,))
def guarded_mul(left, right):
"""Decorate a function to raise an error for values > limit."""
# Only handle ints because floats will overflow anyway.
if not isinstance(left, numbers.Integral):
pass
elif not isinstance(right, numbers.Integral):
pass
elif left in (0, 1) or right in (0, 1):
# Ignore trivial cases.
pass
elif left.bit_length() + right.bit_length() > 664386:
# 664386 is the number of bits (10**100000)**2 has, which is instant on
# my laptop, while (10**1000000)**2 has a noticeable delay. It could
# certainly be improved.
raise ValueError(
"Value is too large to be handled in limited time and memory.")
return operator.mul(left, right)
def pow_complexity(num, exp):
"""
Estimate the worst case time pow(num, exp) takes to calculate.
This function is based on experimetal data from the time it takes to
calculate "num**exp" on laptop with i7-2670QM processor on a 32 bit
CPython 2.7.6 interpreter on Windows.
It tries to implement this surface: x=exp, y=num
1e5 2e5 3e5 4e5 5e5 6e5 7e5 8e5 9e5
e1 0.03 0.09 0.16 0.25 0.35 0.46 0.60 0.73 0.88
e2 0.08 0.24 0.46 0.73 1.03 1.40 1.80 2.21 2.63
e3 0.15 0.46 0.87 1.39 1.99 2.63 3.35 4.18 5.15
e4 0.24 0.73 1.39 2.20 3.11 4.18 5.39 6.59 7.88
e5 0.34 1.03 2.00 3.12 4.48 5.97 7.56 9.37 11.34
e6 0.46 1.39 2.62 4.16 5.97 7.86 10.09 12.56 15.39
e7 0.60 1.79 3.34 5.39 7.60 10.16 13.00 16.23 19.44
e8 0.73 2.20 4.18 6.60 9.37 12.60 16.26 19.83 23.70
e9 0.87 2.62 5.15 7.93 11.34 15.44 19.40 23.66 28.58
For powers of 2 it tries to implement this surface:
1e7 2e7 3e7 4e7 5e7 6e7 7e7 8e7 9e7
1 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00 0.00
2 0.21 0.44 0.71 0.92 1.20 1.49 1.66 1.95 2.23
4 0.43 0.91 1.49 1.96 2.50 3.13 3.54 4.10 4.77
8 0.70 1.50 2.24 3.16 3.83 4.66 5.58 6.56 7.67
The function number were selected by starting with the theoretical
complexity of exp * log2(num)**2 and fiddling with the exponents
untill it more or less matched with the table.
Because this function is based on a limited set of data it might
not give accurate results outside these boundaries. The results
derived from large num and exp were quite accurate for small num
and very large exp though, except when num was a power of 2.
"""
if num in (0, 1) or exp in (0, 1):
return 0
elif (num & (num - 1)) == 0:
# For powers of 2 the scaling is a bit different.
return exp ** 1.092 * num.bit_length() ** 1.65 / 623212911.121
else:
return exp ** 1.590 * num.bit_length() ** 1.73 / 36864057619.3
def guarded_pow(left, right):
# Only handle ints because floats will overflow anyway.
if not isinstance(left, numbers.Integral):
pass
elif not isinstance(right, numbers.Integral):
pass
elif pow_complexity(left, right) < 0.5:
# Value 0.5 is arbitary and based on a estimated runtime of 0.5s
# on a fairly decent laptop processor.
pass
else:
raise ValueError("Pow expression too complex to calculate.")
return operator.pow(left, right)
class EquationEvaluator(ExpressionEvaluator):
__bin_ops = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: guarded_mul,
ast.Div: operator.truediv,
ast.Pow: guarded_pow,
ast.Mod: operator.mod,
ast.FloorDiv: operator.floordiv,
ast.BitXor: guarded_pow
}
__unary_ops = {
ast.USub: operator.neg,
ast.UAdd: operator.pos,
}
def __init__(self):
ExpressionEvaluator.__init__(
self,
bin_ops=self.__bin_ops,
unary_ops=self.__unary_ops
)
def __call__(self, expression_str):
result = ExpressionEvaluator.__call__(self, expression_str)
# This wrapper is here so additional sanity checks could be done
# on the result of the eval, but currently none are done.
return result
eval_equation = EquationEvaluator()
"""
Evaluates a Python equation expression and returns the result.
Supports addition (+), subtraction (-), multiplication (*), division (/),
power (**) and modulo (%).
"""

63
tools/time.py Executable file
View File

@ -0,0 +1,63 @@
#!/usr/bin/env python3
"""
Tools for working with time.
"""
from datetime import datetime
from dateutil.relativedelta import relativedelta
def relativeTime(config, time_1, time_2):
"""
Returns the relative time difference between 'time_1' and 'time_2'.
If either 'time_1' or 'time_2' is a string, it will be converted to a
datetime object according to the 'default_time_format' variable in the
config.
"""
t_format = config.core.default_time_format
if type(time_1) == str:
time_1 = datetime.strptime(time_1, t_format)
if type(time_2) == str:
time_2 = datetime.strptime(time_2, t_format)
msg = []
diff = relativedelta(time_1, time_2)
if diff.years:
if diff.years > 1:
msg.append(f"{diff.years} years")
else:
msg.append(f"{diff.years} year")
if diff.months:
if diff.months > 1:
msg.append(f"{diff.months} months")
else:
msg.append(f"{diff.months} month")
if diff.days:
if diff.days > 1:
msg.append(f"{diff.days} days")
else:
msg.append(f"{diff.days} day")
if not msg:
if diff.hours:
if diff.hours > 1:
msg.append(f"{diff.hours} hours")
else:
msg.append(f"{diff.hours} hour")
if diff.minutes:
if diff.minutes > 1:
msg.append(f"{diff.minutes} minutes")
else:
msg.append(f"{diff.minutes} minute")
if not diff.hours:
if diff.seconds > 1:
msg.append(f"{diff.seconds} seconds")
else:
msg.append(f"{diff.seconds} second")
msg = ", ".join(msg)
return msg

155
trigger.py Executable file
View File

@ -0,0 +1,155 @@
#!/usr/bin/env python3
"""
The trigger abstraction layer.
"""
import datetime
def split_user(user):
"""
Splits a user hostmask into <nick>!<ident>@<host>
"""
nick, _, host = user.partition("!")
ident, _, host = host.partition("@")
if not host:
host = ident
ident = ""
return nick, ident, host
class Group(list):
"""
Custom list class that permits calling it like a function so as to
emulate a re.group instance.
"""
def __init__(self, message, config):
"""
Initializes the group class. If 'message' is a string, we split
it into groups according to the usual trigger.group structure.
Otherwise we assume it's already been split appropriately.
"""
if type(message) == str:
message = self.split_group(message, config)
list.__init__(self, message)
def __call__(self, n=0):
"""
Allows you to call the instance like a function. Or a re.group
instance ;^).
If calling would result in an index error, None is returned instead.
"""
try:
item = list.__getitem__(self, n)
except IndexError:
item = None
return item
def split_group(self, message, config):
"""
Splits the message by spaces.
group(0) is always the entire message.
group(1) is always the first word of the message minus the prefix, if
present. This is usually just the command.
group(2) is always the entire message after the first word.
group(3+) is always every individual word after the first word.
"""
prefix = config.core.prefix
group = []
group.append(message)
words = message.split()
group.append(words[0].replace(prefix, "", 1))
group.append(" ".join(words[1:]))
group += words[1:]
return group
class Trigger():
def __init__(self, user, channel, message, event, config):
self.channel = channel
"""
The channel from which the message was sent.
In a private message, this is the nick that sent the message.
"""
self.time = datetime.datetime.now()
"""
A datetime object at which the message was received by the IRC server.
If the server does not support server-time, then `time` will be the time
that the message was received by Sopel.
"""
self.raw = ""
"""
The entire message, as sent from the server. This includes the CTCP
\\x01 bytes and command, if they were included.
"""
self.is_privmsg = not channel.startswith("#")
"""True if the trigger is from a user, False if it's from a channel."""
self.hostmask = user
"""
Entire hostmask of the person who sent the message.
eg. <nick>!<ident>@<host>
"""
nick, ident, host = split_user(user)
self.nick = nick
"""Nick of person who sent the message."""
self.ident = ident
self.user = ident
"""Local username (AKA ident) of the person who sent the message."""
self.host = host
"""The hostname of the person who sent the message"""
self.event = event
"""
The IRC event (e.g. ``PRIVMSG`` or ``MODE``) which triggered the
message.
"""
self.group = Group(message, config)
"""The ``group`` function of the ``match`` attribute.
See Python :mod:`re` documentation for details."""
self.args = ()
"""
A tuple containing each of the arguments to an event. These are the
strings passed between the event name and the colon. For example,
setting ``mode -m`` on the channel ``#example``, args would be
``('#example', '-m')``
"""
admins = config.core.admins.split(",") + [config.core.owner]
self.admin = any([self.compare_hostmask(admin) for admin in admins])
"""
True if the nick which triggered the command is one of the bot's
admins.
"""
self.owner = self.compare_hostmask(config.core.owner)
"""True if the nick which triggered the command is the bot's owner."""
def compare_hostmask(self, compare_against):
"""
Compares the current hostmask against the given one. If ident is not
None, it uses that, otherwise it only uses <nick>@<host>.
"""
if self.ident:
return compare_against == self.hostmask
else:
return compare_against == "@".join(self.nick, self.host)
def set_group(self, line, config):
"""
Allows a you to easily change the current group to a new Group
instance.
"""
self.group = Group(line, config)