fulvia/tools.py

254 lines
5.7 KiB
Python
Raw Normal View History

2018-03-16 03:13:43 -04:00
#!/usr/bin/env python3
"""
Some helper functions and other tools.
"""
import re
2020-01-29 14:15:43 -05:00
import argparse
import calendar
2018-03-16 03:13:43 -04:00
import threading
2018-05-27 21:45:17 -04:00
from collections import defaultdict
from datetime import datetime
2018-03-16 03:13:43 -04:00
op_level = {
"voice": 1,
"v": 1,
"+": 1,
"halfop": 2,
"h": 2,
"%": 2,
"op": 4,
"o": 4,
"@": 4,
"admin": 8,
"a": 8,
"&": 8,
"owner": 16,
"q": 16,
"~": 16
}
class FulviaMemory(dict):
"""
A simple thread-safe dict implementation.
In order to prevent exceptions when iterating over the values and changing
them at the same time from different threads, we use a blocking lock on
``__setitem__`` and ``__contains__``.
"""
def __init__(self, *args):
dict.__init__(self, *args)
self.lock = threading.Lock()
def __setitem__(self, key, value):
"""
Set a key-value pair. Eg. 'dict[key]=value'.
"""
2018-03-16 03:13:43 -04:00
self.lock.acquire()
result = dict.__setitem__(self, key, value)
self.lock.release()
return result
def __getitem__(self, key):
2018-03-16 03:13:43 -04:00
"""
Get the value of 'key'. Eg. 'dict[key]'.
"""
self.lock.acquire()
result = dict.__getitem__(self, key)
self.lock.release()
return result
2018-03-16 03:13:43 -04:00
def __contains__(self, key):
"""
Check if a key is in the dict. Eg. 'key in dict'.
2018-03-16 03:13:43 -04:00
"""
self.lock.acquire()
result = dict.__contains__(self, key)
self.lock.release()
return result
2018-05-27 21:45:17 -04:00
class FulviaMemoryDefault(defaultdict):
"""
A simple thread-safe dict implementation.
In order to prevent exceptions when iterating over the values and changing
them at the same time from different threads, we use a blocking lock on
``__setitem__`` and ``__contains__``.
"""
def __init__(self, *args):
defaultdict.__init__(self, *args)
self.lock = threading.Lock()
def __setitem__(self, key, value):
"""
Set a key-value pair. Eg. 'dict[key]=value'.
"""
2018-05-27 21:45:17 -04:00
self.lock.acquire()
result = defaultdict.__setitem__(self, key, value)
self.lock.release()
return result
def __getitem__(self, key):
"""
Get the value of 'key'. Eg. 'dict[key]'.
"""
# TODO: figure out why this doesn't work
#self.lock.acquire()
result = defaultdict.__getitem__(self, key)
#self.lock.release()
return result
2018-05-27 21:45:17 -04:00
def __contains__(self, key):
"""
Check if a key is in the dict. Eg. 'key in dict'.
2018-05-27 21:45:17 -04:00
"""
self.lock.acquire()
result = defaultdict.__contains__(self, key)
self.lock.release()
return result
class User:
"""A representation of a user in a channel."""
2018-03-16 03:13:43 -04:00
def __init__(self, nick):
self.nick = nick
"""The user's nickname."""
self.ident = ""
self.user = self.ident
"""The user's local username/ident."""
self.host = ""
"""The user's hostname."""
self.realname = ""
"""The user's realname."""
2018-03-16 03:13:43 -04:00
self.away = None
"""Whether the user is marked as away."""
self.op_level = ''
"""The user's op level in this channel."""
2018-03-16 03:13:43 -04:00
def hostmask(self):
"""Returns the user's full hostmask."""
return f"{self.nick}!{self.user}@{self.host}"
2018-03-16 03:13:43 -04:00
class Channel:
2018-03-16 03:13:43 -04:00
"""A representation of a channel Fulvia is in."""
def __init__(self, name):
self.name = name
"""The name of the channel."""
self.type = ""
"""
The type of channel this is. Options are 'secret', 'private' or
'public' per RFC 2812.
"""
self.topic = ""
"""The topic of the channel."""
self.users = {}
"""The users in the channel."""
2018-03-16 03:13:43 -04:00
self.modes = set()
"""The current modes on the channel."""
2018-03-16 03:13:43 -04:00
def configureHostMask(mask):
"""
Returns a valid hostmask based on user input.
"""
if mask == '*!*@*':
return mask
if re.match('^[^.@!/]+$', mask) is not None:
return '%s!*@*' % mask
if re.match('^[^@!]+$', mask) is not None:
return '*!*@%s' % mask
m = re.match('^([^!@]+)@$', mask)
if m is not None:
return '*!%s@*' % m.group(1)
m = re.match('^([^!@]+)@([^@!]+)$', mask)
if m is not None:
return '*!%s@%s' % (m.group(1), m.group(2))
m = re.match('^([^!@]+)!(^[!@]+)@?$', mask)
if m is not None:
return '%s!%s@*' % (m.group(1), m.group(2))
return ''
2018-05-27 21:45:17 -04:00
def getOpSym(level):
"""Returns the appropriate op_level symbol given a level value."""
if level >= op_level["owner"]:
return "~"
elif level >= op_level["admin"]:
return "&"
elif level >= op_level["op"]:
return "@"
elif level >= op_level["halfop"]:
return "%"
elif level >= op_level["voice"]:
return "+"
else:
return " "
2020-01-29 14:15:43 -05:00
class FulviaArgparse(argparse.ArgumentParser):
"""
A custom ArgParser class that raises errors as exceptions rather than
printing them to stderr.
"""
def __init__(self, *args, **kwargs):
kwargs['add_help'] = False
super().__init__(*args, **kwargs)
def error(self, message):
raise argparse.ArgumentError(None, message)
def relative_time(time_1, time_2):
"""
Returns a relative timestamp between `time_1` and `time_2`. Inputs
must be datetime objects.
"""
assert type(time_1) == datetime, "time_1 must be datetime object"
assert type(time_2) == datetime, "time_2 must be datetime object"
diff = time_1 - time_2
2020-03-30 08:50:16 -04:00
msg = ""
units = {}
if diff.days >= 365:
2020-03-30 08:50:16 -04:00
units['year'] = diff.days // 365
if diff.days >= 0:
units['day'] = diff.days - units.get('year', 0)*365
2022-02-16 17:29:12 -05:00
if units.get('year'):
units['day'] -= calendar.leapdays(time_2.year, time_1.year)
2020-03-30 08:50:16 -04:00
if not any(units.values()):
if diff.seconds // 3600:
units['hour'] = diff.seconds // 3600
if diff.seconds // 60:
units['minute'] = (diff.seconds - units.get('hour', 0)*3600) // 60
if not units.get('hour'):
units['second'] = diff.seconds - units.get('minute', 0)*60
for unit, value in units.items():
if value:
msg += str(value) + ' ' + unit
if value > 1:
msg += 's'
msg += ', '
msg = msg[:-2]
return msg