fulvia/tools.py

262 lines
5.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Some helper functions and other tools.
"""
import re
import argparse
import threading
from collections import defaultdict
from datetime import datetime
op_level = {
"voice": 1,
"v": 1,
"+": 1,
"halfop": 2,
"h": 2,
"%": 2,
"op": 4,
"o": 4,
"@": 4,
"admin": 8,
"a": 8,
"&": 8,
"owner": 16,
"q": 16,
"~": 16
}
class FulviaMemory(dict):
"""
A simple thread-safe dict implementation.
In order to prevent exceptions when iterating over the values and changing
them at the same time from different threads, we use a blocking lock on
``__setitem__`` and ``__contains__``.
"""
def __init__(self, *args):
dict.__init__(self, *args)
self.lock = threading.Lock()
def __setitem__(self, key, value):
"""
Set a key-value pair. Eg. 'dict[key]=value'.
"""
self.lock.acquire()
result = dict.__setitem__(self, key, value)
self.lock.release()
return result
def __getitem__(self, key):
"""
Get the value of 'key'. Eg. 'dict[key]'.
"""
self.lock.acquire()
result = dict.__getitem__(self, key)
self.lock.release()
return result
def __contains__(self, key):
"""
Check if a key is in the dict. Eg. 'key in dict'.
"""
self.lock.acquire()
result = dict.__contains__(self, key)
self.lock.release()
return result
class FulviaMemoryDefault(defaultdict):
"""
A simple thread-safe dict implementation.
In order to prevent exceptions when iterating over the values and changing
them at the same time from different threads, we use a blocking lock on
``__setitem__`` and ``__contains__``.
"""
def __init__(self, *args):
defaultdict.__init__(self, *args)
self.lock = threading.Lock()
def __setitem__(self, key, value):
"""
Set a key-value pair. Eg. 'dict[key]=value'.
"""
self.lock.acquire()
result = defaultdict.__setitem__(self, key, value)
self.lock.release()
return result
def __getitem__(self, key):
"""
Get the value of 'key'. Eg. 'dict[key]'.
"""
# TODO: figure out why this doesn't work
#self.lock.acquire()
result = defaultdict.__getitem__(self, key)
#self.lock.release()
return result
def __contains__(self, key):
"""
Check if a key is in the dict. Eg. 'key in dict'.
"""
self.lock.acquire()
result = defaultdict.__contains__(self, key)
self.lock.release()
return result
class User:
"""A representation of a user in a channel."""
def __init__(self, nick):
self.nick = nick
"""The user's nickname."""
self.ident = ""
self.user = self.ident
"""The user's local username/ident."""
self.host = ""
"""The user's hostname."""
self.realname = ""
"""The user's realname."""
self.away = None
"""Whether the user is marked as away."""
self.op_level = ''
"""The user's op level in this channel."""
def hostmask(self):
"""Returns the user's full hostmask."""
return f"{self.nick}!{self.user}@{self.host}"
class Channel:
"""A representation of a channel Fulvia is in."""
def __init__(self, name):
self.name = name
"""The name of the channel."""
self.type = ""
"""
The type of channel this is. Options are 'secret', 'private' or
'public' per RFC 2812.
"""
self.topic = ""
"""The topic of the channel."""
self.users = {}
"""The users in the channel."""
self.modes = set()
"""The current modes on the channel."""
def configureHostMask(mask):
"""
Returns a valid hostmask based on user input.
"""
if mask == '*!*@*':
return mask
if re.match('^[^.@!/]+$', mask) is not None:
return '%s!*@*' % mask
if re.match('^[^@!]+$', mask) is not None:
return '*!*@%s' % mask
m = re.match('^([^!@]+)@$', mask)
if m is not None:
return '*!%s@*' % m.group(1)
m = re.match('^([^!@]+)@([^@!]+)$', mask)
if m is not None:
return '*!%s@%s' % (m.group(1), m.group(2))
m = re.match('^([^!@]+)!(^[!@]+)@?$', mask)
if m is not None:
return '%s!%s@*' % (m.group(1), m.group(2))
return ''
def getOpSym(level):
"""Returns the appropriate op_level symbol given a level value."""
if level >= op_level["owner"]:
return "~"
elif level >= op_level["admin"]:
return "&"
elif level >= op_level["op"]:
return "@"
elif level >= op_level["halfop"]:
return "%"
elif level >= op_level["voice"]:
return "+"
else:
return " "
class FulviaArgparse(argparse.ArgumentParser):
"""
A custom ArgParser class that raises errors as exceptions rather than
printing them to stderr.
"""
def __init__(self, *args, **kwargs):
kwargs['add_help'] = False
super().__init__(*args, **kwargs)
def error(self, message):
raise argparse.ArgumentError(None, message)
def relative_time(time_1, time_2):
"""
Returns a relative timestamp between `time_1` and `time_2`. Inputs
must be datetime objects.
"""
assert type(time_1) == datetime, "time_1 must be datetime object"
assert type(time_2) == datetime, "time_2 must be datetime object"
diff = time_1 - time_2
msg = []
if diff.days >= 365:
if diff.days // 365 > 1:
msg.append(f"{diff.days // 365} years")
else:
msg.append(f"{diff.days // 365} year")
if diff.days:
if diff.days > 1:
msg.append(f"{diff.days - (diff.days // 365)*365} days")
else:
msg.append(f"{diff.days} - (diff.days // 365)*365 day")
if not msg:
if diff.hours:
if diff.hours > 1:
msg.append(f"{diff.hours} hours")
else:
msg.append(f"{diff.hours} hour")
if diff.minutes:
if diff.minutes > 1:
msg.append(f"{diff.minutes} minutes")
else:
msg.append(f"{diff.minutes} minute")
if not diff.hours:
if diff.seconds > 1:
msg.append(f"{diff.seconds} seconds")
else:
msg.append(f"{diff.seconds} second")
msg = ", ".join(msg)
return msg