#!/usr/bin/env python3 """ Some helper functions and other tools. """ import re import threading from collections import defaultdict op_level = { "voice": 1, "v": 1, "+": 1, "halfop": 2, "h": 2, "%": 2, "op": 4, "o": 4, "@": 4, "admin": 8, "a": 8, "&": 8, "owner": 16, "q": 16, "~": 16 } class FulviaMemory(dict): """ A simple thread-safe dict implementation. In order to prevent exceptions when iterating over the values and changing them at the same time from different threads, we use a blocking lock on ``__setitem__`` and ``__contains__``. """ def __init__(self, *args): dict.__init__(self, *args) self.lock = threading.Lock() def __setitem__(self, key, value): """ Set a key-value pair. Eg. 'dict[key]=value'. """ self.lock.acquire() result = dict.__setitem__(self, key, value) self.lock.release() return result def __getitem__(self, key): """ Get the value of 'key'. Eg. 'dict[key]'. """ self.lock.acquire() result = dict.__getitem__(self, key) self.lock.release() return result def __contains__(self, key): """ Check if a key is in the dict. Eg. 'key in dict'. """ self.lock.acquire() result = dict.__contains__(self, key) self.lock.release() return result class FulviaMemoryDefault(defaultdict): """ A simple thread-safe dict implementation. In order to prevent exceptions when iterating over the values and changing them at the same time from different threads, we use a blocking lock on ``__setitem__`` and ``__contains__``. """ def __init__(self, *args): defaultdict.__init__(self, *args) self.lock = threading.Lock() def __setitem__(self, key, value): """ Set a key-value pair. Eg. 'dict[key]=value'. """ self.lock.acquire() result = defaultdict.__setitem__(self, key, value) self.lock.release() return result def __getitem__(self, key): """ Get the value of 'key'. Eg. 'dict[key]'. """ # TODO: figure out why this doesn't work #self.lock.acquire() result = defaultdict.__getitem__(self, key) #self.lock.release() return result def __contains__(self, key): """ Check if a key is in the dict. Eg. 'key in dict'. """ self.lock.acquire() result = defaultdict.__contains__(self, key) self.lock.release() return result class User: """A representation of a user in a channel.""" def __init__(self, nick): self.nick = nick """The user's nickname.""" self.ident = "" self.user = self.ident """The user's local username/ident.""" self.host = "" """The user's hostname.""" self.realname = "" """The user's realname.""" self.away = None """Whether the user is marked as away.""" self.op_level = '' """The user's op level in this channel.""" def hostmask(self): """Returns the user's full hostmask.""" return f"{self.nick}!{self.user}@{self.host}" class Channel: """A representation of a channel Fulvia is in.""" def __init__(self, name): self.name = name """The name of the channel.""" self.type = "" """ The type of channel this is. Options are 'secret', 'private' or 'public' per RFC 2812. """ self.topic = "" """The topic of the channel.""" self.users = {} """The users in the channel.""" self.modes = set() """The current modes on the channel.""" def configureHostMask(mask): """ Returns a valid hostmask based on user input. """ if mask == '*!*@*': return mask if re.match('^[^.@!/]+$', mask) is not None: return '%s!*@*' % mask if re.match('^[^@!]+$', mask) is not None: return '*!*@%s' % mask m = re.match('^([^!@]+)@$', mask) if m is not None: return '*!%s@*' % m.group(1) m = re.match('^([^!@]+)@([^@!]+)$', mask) if m is not None: return '*!%s@%s' % (m.group(1), m.group(2)) m = re.match('^([^!@]+)!(^[!@]+)@?$', mask) if m is not None: return '%s!%s@*' % (m.group(1), m.group(2)) return '' def getOpSym(level): """Returns the appropriate op_level symbol given a level value.""" if level >= op_level["owner"]: return "~" elif level >= op_level["admin"]: return "&" elif level >= op_level["op"]: return "@" elif level >= op_level["halfop"]: return "%" elif level >= op_level["voice"]: return "+" else: return " "