254 lines
5.7 KiB
Python
Executable File
254 lines
5.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Some helper functions and other tools.
|
|
"""
|
|
import re
|
|
import argparse
|
|
import calendar
|
|
import threading
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
op_level = {
|
|
"voice": 1,
|
|
"v": 1,
|
|
"+": 1,
|
|
"halfop": 2,
|
|
"h": 2,
|
|
"%": 2,
|
|
"op": 4,
|
|
"o": 4,
|
|
"@": 4,
|
|
"admin": 8,
|
|
"a": 8,
|
|
"&": 8,
|
|
"owner": 16,
|
|
"q": 16,
|
|
"~": 16
|
|
}
|
|
|
|
class FulviaMemory(dict):
|
|
"""
|
|
A simple thread-safe dict implementation.
|
|
|
|
In order to prevent exceptions when iterating over the values and changing
|
|
them at the same time from different threads, we use a blocking lock on
|
|
``__setitem__`` and ``__contains__``.
|
|
"""
|
|
def __init__(self, *args):
|
|
dict.__init__(self, *args)
|
|
self.lock = threading.Lock()
|
|
|
|
|
|
def __setitem__(self, key, value):
|
|
"""
|
|
Set a key-value pair. Eg. 'dict[key]=value'.
|
|
"""
|
|
self.lock.acquire()
|
|
result = dict.__setitem__(self, key, value)
|
|
self.lock.release()
|
|
return result
|
|
|
|
|
|
def __getitem__(self, key):
|
|
"""
|
|
Get the value of 'key'. Eg. 'dict[key]'.
|
|
"""
|
|
self.lock.acquire()
|
|
result = dict.__getitem__(self, key)
|
|
self.lock.release()
|
|
return result
|
|
|
|
|
|
def __contains__(self, key):
|
|
"""
|
|
Check if a key is in the dict. Eg. 'key in dict'.
|
|
"""
|
|
self.lock.acquire()
|
|
result = dict.__contains__(self, key)
|
|
self.lock.release()
|
|
return result
|
|
|
|
|
|
class FulviaMemoryDefault(defaultdict):
|
|
"""
|
|
A simple thread-safe dict implementation.
|
|
|
|
In order to prevent exceptions when iterating over the values and changing
|
|
them at the same time from different threads, we use a blocking lock on
|
|
``__setitem__`` and ``__contains__``.
|
|
"""
|
|
def __init__(self, *args):
|
|
defaultdict.__init__(self, *args)
|
|
self.lock = threading.Lock()
|
|
|
|
|
|
def __setitem__(self, key, value):
|
|
"""
|
|
Set a key-value pair. Eg. 'dict[key]=value'.
|
|
"""
|
|
self.lock.acquire()
|
|
result = defaultdict.__setitem__(self, key, value)
|
|
self.lock.release()
|
|
return result
|
|
|
|
|
|
def __getitem__(self, key):
|
|
"""
|
|
Get the value of 'key'. Eg. 'dict[key]'.
|
|
"""
|
|
# TODO: figure out why this doesn't work
|
|
#self.lock.acquire()
|
|
result = defaultdict.__getitem__(self, key)
|
|
#self.lock.release()
|
|
return result
|
|
|
|
|
|
def __contains__(self, key):
|
|
"""
|
|
Check if a key is in the dict. Eg. 'key in dict'.
|
|
"""
|
|
self.lock.acquire()
|
|
result = defaultdict.__contains__(self, key)
|
|
self.lock.release()
|
|
return result
|
|
|
|
|
|
class User:
|
|
"""A representation of a user in a channel."""
|
|
def __init__(self, nick):
|
|
self.nick = nick
|
|
"""The user's nickname."""
|
|
|
|
self.ident = ""
|
|
self.user = self.ident
|
|
"""The user's local username/ident."""
|
|
|
|
self.host = ""
|
|
"""The user's hostname."""
|
|
|
|
self.realname = ""
|
|
"""The user's realname."""
|
|
|
|
self.away = None
|
|
"""Whether the user is marked as away."""
|
|
|
|
self.op_level = ''
|
|
"""The user's op level in this channel."""
|
|
|
|
def hostmask(self):
|
|
"""Returns the user's full hostmask."""
|
|
return f"{self.nick}!{self.user}@{self.host}"
|
|
|
|
|
|
class Channel:
|
|
"""A representation of a channel Fulvia is in."""
|
|
def __init__(self, name):
|
|
self.name = name
|
|
"""The name of the channel."""
|
|
|
|
self.type = ""
|
|
"""
|
|
The type of channel this is. Options are 'secret', 'private' or
|
|
'public' per RFC 2812.
|
|
"""
|
|
|
|
self.topic = ""
|
|
"""The topic of the channel."""
|
|
|
|
self.users = {}
|
|
"""The users in the channel."""
|
|
|
|
self.modes = set()
|
|
"""The current modes on the channel."""
|
|
|
|
|
|
def configureHostMask(mask):
|
|
"""
|
|
Returns a valid hostmask based on user input.
|
|
"""
|
|
if mask == '*!*@*':
|
|
return mask
|
|
if re.match('^[^.@!/]+$', mask) is not None:
|
|
return '%s!*@*' % mask
|
|
if re.match('^[^@!]+$', mask) is not None:
|
|
return '*!*@%s' % mask
|
|
|
|
m = re.match('^([^!@]+)@$', mask)
|
|
if m is not None:
|
|
return '*!%s@*' % m.group(1)
|
|
|
|
m = re.match('^([^!@]+)@([^@!]+)$', mask)
|
|
if m is not None:
|
|
return '*!%s@%s' % (m.group(1), m.group(2))
|
|
|
|
m = re.match('^([^!@]+)!(^[!@]+)@?$', mask)
|
|
if m is not None:
|
|
return '%s!%s@*' % (m.group(1), m.group(2))
|
|
return ''
|
|
|
|
|
|
def getOpSym(level):
|
|
"""Returns the appropriate op_level symbol given a level value."""
|
|
if level >= op_level["owner"]:
|
|
return "~"
|
|
elif level >= op_level["admin"]:
|
|
return "&"
|
|
elif level >= op_level["op"]:
|
|
return "@"
|
|
elif level >= op_level["halfop"]:
|
|
return "%"
|
|
elif level >= op_level["voice"]:
|
|
return "+"
|
|
else:
|
|
return " "
|
|
|
|
|
|
class FulviaArgparse(argparse.ArgumentParser):
|
|
"""
|
|
A custom ArgParser class that raises errors as exceptions rather than
|
|
printing them to stderr.
|
|
"""
|
|
def __init__(self, *args, **kwargs):
|
|
kwargs['add_help'] = False
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def error(self, message):
|
|
raise argparse.ArgumentError(None, message)
|
|
|
|
|
|
def relative_time(time_1, time_2):
|
|
"""
|
|
Returns a relative timestamp between `time_1` and `time_2`. Inputs
|
|
must be datetime objects.
|
|
"""
|
|
assert type(time_1) == datetime, "time_1 must be datetime object"
|
|
assert type(time_2) == datetime, "time_2 must be datetime object"
|
|
|
|
diff = time_1 - time_2
|
|
msg = ""
|
|
units = {}
|
|
if diff.days >= 365:
|
|
units['year'] = diff.days // 365
|
|
if diff.days >= 0:
|
|
units['day'] = diff.days - units.get('year', 0)*365
|
|
if units.get('year'):
|
|
units['day'] -= calendar.leapdays(time_2.year, time_1.year)
|
|
|
|
if not any(units.values()):
|
|
if diff.seconds // 3600:
|
|
units['hour'] = diff.seconds // 3600
|
|
if diff.seconds // 60:
|
|
units['minute'] = (diff.seconds - units.get('hour', 0)*3600) // 60
|
|
if not units.get('hour'):
|
|
units['second'] = diff.seconds - units.get('minute', 0)*60
|
|
|
|
for unit, value in units.items():
|
|
if value:
|
|
msg += str(value) + ' ' + unit
|
|
if value > 1:
|
|
msg += 's'
|
|
msg += ', '
|
|
msg = msg[:-2]
|
|
return msg
|