#!/usr/bin/env python3 """ Some helper functions and other tools. """ import re import argparse import calendar import threading from collections import defaultdict from datetime import datetime op_level = { "voice": 1, "v": 1, "+": 1, "halfop": 2, "h": 2, "%": 2, "op": 4, "o": 4, "@": 4, "admin": 8, "a": 8, "&": 8, "owner": 16, "q": 16, "~": 16 } class FulviaMemory(dict): """ A simple thread-safe dict implementation. In order to prevent exceptions when iterating over the values and changing them at the same time from different threads, we use a blocking lock on ``__setitem__`` and ``__contains__``. """ def __init__(self, *args): dict.__init__(self, *args) self.lock = threading.Lock() def __setitem__(self, key, value): """ Set a key-value pair. Eg. 'dict[key]=value'. """ self.lock.acquire() result = dict.__setitem__(self, key, value) self.lock.release() return result def __getitem__(self, key): """ Get the value of 'key'. Eg. 'dict[key]'. """ self.lock.acquire() result = dict.__getitem__(self, key) self.lock.release() return result def __contains__(self, key): """ Check if a key is in the dict. Eg. 'key in dict'. """ self.lock.acquire() result = dict.__contains__(self, key) self.lock.release() return result class FulviaMemoryDefault(defaultdict): """ A simple thread-safe dict implementation. In order to prevent exceptions when iterating over the values and changing them at the same time from different threads, we use a blocking lock on ``__setitem__`` and ``__contains__``. """ def __init__(self, *args): defaultdict.__init__(self, *args) self.lock = threading.Lock() def __setitem__(self, key, value): """ Set a key-value pair. Eg. 'dict[key]=value'. """ self.lock.acquire() result = defaultdict.__setitem__(self, key, value) self.lock.release() return result def __getitem__(self, key): """ Get the value of 'key'. Eg. 'dict[key]'. """ # TODO: figure out why this doesn't work #self.lock.acquire() result = defaultdict.__getitem__(self, key) #self.lock.release() return result def __contains__(self, key): """ Check if a key is in the dict. Eg. 'key in dict'. """ self.lock.acquire() result = defaultdict.__contains__(self, key) self.lock.release() return result class User: """A representation of a user in a channel.""" def __init__(self, nick): self.nick = nick """The user's nickname.""" self.ident = "" self.user = self.ident """The user's local username/ident.""" self.host = "" """The user's hostname.""" self.realname = "" """The user's realname.""" self.away = None """Whether the user is marked as away.""" self.op_level = '' """The user's op level in this channel.""" def hostmask(self): """Returns the user's full hostmask.""" return f"{self.nick}!{self.user}@{self.host}" class Channel: """A representation of a channel Fulvia is in.""" def __init__(self, name): self.name = name """The name of the channel.""" self.type = "" """ The type of channel this is. Options are 'secret', 'private' or 'public' per RFC 2812. """ self.topic = "" """The topic of the channel.""" self.users = {} """The users in the channel.""" self.modes = set() """The current modes on the channel.""" def configureHostMask(mask): """ Returns a valid hostmask based on user input. """ if mask == '*!*@*': return mask if re.match('^[^.@!/]+$', mask) is not None: return '%s!*@*' % mask if re.match('^[^@!]+$', mask) is not None: return '*!*@%s' % mask m = re.match('^([^!@]+)@$', mask) if m is not None: return '*!%s@*' % m.group(1) m = re.match('^([^!@]+)@([^@!]+)$', mask) if m is not None: return '*!%s@%s' % (m.group(1), m.group(2)) m = re.match('^([^!@]+)!(^[!@]+)@?$', mask) if m is not None: return '%s!%s@*' % (m.group(1), m.group(2)) return '' def getOpSym(level): """Returns the appropriate op_level symbol given a level value.""" if level >= op_level["owner"]: return "~" elif level >= op_level["admin"]: return "&" elif level >= op_level["op"]: return "@" elif level >= op_level["halfop"]: return "%" elif level >= op_level["voice"]: return "+" else: return " " class FulviaArgparse(argparse.ArgumentParser): """ A custom ArgParser class that raises errors as exceptions rather than printing them to stderr. """ def __init__(self, *args, **kwargs): kwargs['add_help'] = False super().__init__(*args, **kwargs) def error(self, message): raise argparse.ArgumentError(None, message) def relative_time(time_1, time_2): """ Returns a relative timestamp between `time_1` and `time_2`. Inputs must be datetime objects. """ assert type(time_1) == datetime, "time_1 must be datetime object" assert type(time_2) == datetime, "time_2 must be datetime object" diff = time_1 - time_2 msg = "" units = {} if diff.days >= 365: units['year'] = diff.days // 365 if diff.days >= 0: units['day'] = diff.days - units.get('year', 0)*365 if units.get('year'): units['day'] -= calendar.leapdays(time_2.year, time_1.year) if not any(units.values()): if diff.seconds // 3600: units['hour'] = diff.seconds // 3600 if diff.seconds // 60: units['minute'] = (diff.seconds - units.get('hour', 0)*3600) // 60 if not units.get('hour'): units['second'] = diff.seconds - units.get('minute', 0)*60 for unit, value in units.items(): if value: msg += str(value) + ' ' + unit if value > 1: msg += 's' msg += ', ' msg = msg[:-2] return msg