fulvia/tools/__init__.py

229 lines
5.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Some helper functions and other tools.
"""
import re
import threading
from collections import defaultdict
op_level = {
"voice": 1,
"v": 1,
"+": 1,
"halfop": 2,
"h": 2,
"%": 2,
"op": 4,
"o": 4,
"@": 4,
"admin": 8,
"a": 8,
"&": 8,
"owner": 16,
"q": 16,
"~": 16
}
class FulviaMemory(dict):
"""
A simple thread-safe dict implementation.
In order to prevent exceptions when iterating over the values and changing
them at the same time from different threads, we use a blocking lock on
``__setitem__`` and ``__contains__``.
"""
def __init__(self, *args):
dict.__init__(self, *args)
self.lock = threading.Lock()
def __setitem__(self, key, value):
"""
Set a key-value pair. Eg. 'dict[key]=value'.
"""
self.lock.acquire()
result = dict.__setitem__(self, key, value)
self.lock.release()
return result
def __getitem__(self, key):
"""
Get the value of 'key'. Eg. 'dict[key]'.
"""
self.lock.acquire()
result = dict.__getitem__(self, key)
self.lock.release()
return result
def __contains__(self, key):
"""
Check if a key is in the dict. Eg. 'key in dict'.
"""
self.lock.acquire()
result = dict.__contains__(self, key)
self.lock.release()
return result
class FulviaMemoryDefault(defaultdict):
"""
A simple thread-safe dict implementation.
In order to prevent exceptions when iterating over the values and changing
them at the same time from different threads, we use a blocking lock on
``__setitem__`` and ``__contains__``.
"""
def __init__(self, *args):
defaultdict.__init__(self, *args)
self.lock = threading.Lock()
def __setitem__(self, key, value):
"""
Set a key-value pair. Eg. 'dict[key]=value'.
"""
self.lock.acquire()
result = defaultdict.__setitem__(self, key, value)
self.lock.release()
return result
def __getitem__(self, key):
"""
Get the value of 'key'. Eg. 'dict[key]'.
"""
# TODO: figure out why this doesn't work
#self.lock.acquire()
result = defaultdict.__getitem__(self, key)
#self.lock.release()
return result
def __contains__(self, key):
"""
Check if a key is in the dict. Eg. 'key in dict'.
"""
self.lock.acquire()
result = defaultdict.__contains__(self, key)
self.lock.release()
return result
class User(object):
"""A representation of a user Fulvia is aware of."""
def __init__(self, nick):
self.nick = nick
"""The user's nickname."""
self.ident = ""
self.user = self.ident
"""The user's local username/ident."""
self.host = ""
"""The user's hostname."""
self.channels = {}
"""The channels the user is in."""
self.away = None
"""Whether the user is marked as away."""
hostmask = property(lambda self: '{}!{}@{}'.format(self.nick, self.user,
self.host))
"""The user's full hostmask."""
class Channel(object):
"""A representation of a channel Fulvia is in."""
def __init__(self, name):
self.name = name
"""The name of the channel."""
self.type = ""
"""
The type of channel this is. Options are 'secret', 'private' or
'public' per RFC 2812.
"""
self.topic = ""
"""The topic of the channel."""
self.users = {}
"""The users in the channel. A set to ensure there are no duplicates."""
self.privileges = {}
"""The op levels of the users in the channel."""
self.modes = set()
"""The current modes on the channel."""
def remove_user(self, nick):
"""
Removes a user from the channel.
"""
user = self.users.pop(nick, None)
self.privileges.pop(nick, None)
if user != None:
user.channels.pop(self.name, None)
def add_user(self, user):
"""
Adds a user to the channel.
"""
assert isinstance(user, User)
self.users[user.nick] = user
self.privileges[user.nick] = 0
user.channels[self.name] = self
def rename_user(self, old, new):
"""
Renames the user.
"""
if old in self.users:
self.users[new] = self.users.pop(old)
if old in self.privileges:
self.privileges[new] = self.privileges.pop(old)
def configureHostMask(mask):
"""
Returns a valid hostmask based on user input.
"""
if mask == '*!*@*':
return mask
if re.match('^[^.@!/]+$', mask) is not None:
return '%s!*@*' % mask
if re.match('^[^@!]+$', mask) is not None:
return '*!*@%s' % mask
m = re.match('^([^!@]+)@$', mask)
if m is not None:
return '*!%s@*' % m.group(1)
m = re.match('^([^!@]+)@([^@!]+)$', mask)
if m is not None:
return '*!%s@%s' % (m.group(1), m.group(2))
m = re.match('^([^!@]+)!(^[!@]+)@?$', mask)
if m is not None:
return '%s!%s@*' % (m.group(1), m.group(2))
return ''
def getOpSym(level):
"""Returns the appropriate op_level symbol given a level value."""
if level >= op_level["owner"]:
return "~"
elif level >= op_level["admin"]:
return "&"
elif level >= op_level["op"]:
return "@"
elif level >= op_level["halfop"]:
return "%"
elif level >= op_level["voice"]:
return "+"
else:
return " "