sopel/tools/__init__.py

359 lines
9.4 KiB
Python
Executable File

#! /usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Useful miscellaneous tools and shortcuts for Sopel modules
*Availability: 3+*
"""
import sys
import os
import re
import threading
import codecs
import traceback
from collections import defaultdict
from tools._events import events # NOQA
iteritems = dict.items
itervalues = dict.values
iterkeys = dict.keys
_channel_prefixes = ('#', '&', '+', '!')
def get_input(prompt):
"""Get decoded input from the terminal (equivalent to python 3's ``input``).
"""
return input(prompt)
def get_raising_file_and_line(tb=None):
"""Return the file and line number of the statement that raised the tb.
Returns: (filename, lineno) tuple
"""
if not tb:
tb = sys.exc_info()[2]
filename, lineno, _context, _line = traceback.extract_tb(tb)[-1]
return filename, lineno
def get_command_regexp(prefix, command):
"""Return a compiled regexp object that implements the command."""
# Escape all whitespace with a single backslash. This ensures that regexp
# in the prefix is treated as it was before the actual regexp was changed
# to use the verbose syntax.
prefix = re.sub(r"(\s)", r"\\\1", prefix)
# This regexp match equivalently and produce the same
# groups 1 and 2 as the old regexp: r'^%s(%s)(?: +(.*))?$'
# The only differences should be handling all whitespace
# like spaces and the addition of groups 3-6.
pattern = r"""
(?:{prefix})({command}) # Command as group 1.
(?:\s+ # Whitespace to end command.
( # Rest of the line as group 2.
(?:(\S+))? # Parameters 1-4 as groups 3-6.
(?:\s+(\S+))?
(?:\s+(\S+))?
(?:\s+(\S+))?
.* # Accept anything after the parameters.
# Leave it up to the module to parse
# the line.
))? # Group 2 must be None, if there are no
# parameters.
$ # EoL, so there are no partial matches.
""".format(prefix=prefix, command=command)
return re.compile(pattern, re.IGNORECASE | re.VERBOSE)
def deprecated(old):
def new(*args, **kwargs):
print('Function %s is deprecated.' % old.__name__, file=sys.stderr)
trace = traceback.extract_stack()
for line in traceback.format_list(trace[:-1]):
stderr(line[:-1])
return old(*args, **kwargs)
new.__doc__ = old.__doc__
new.__name__ = old.__name__
return new
# from
# http://parand.com/say/index.php/2007/07/13/simple-multi-dimensional-dictionaries-in-python/
# A simple class to make mutli dimensional dict easy to use
class Ddict(dict):
"""Class for multi-dimensional ``dict``.
A simple helper class to ease the creation of multi-dimensional ``dict``\s.
"""
def __init__(self, default=None):
self.default = default
def __getitem__(self, key):
if key not in self:
self[key] = self.default()
return dict.__getitem__(self, key)
class Identifier(str):
"""A `str` subclass which acts appropriately for IRC identifiers.
When used as normal `str` objects, case will be preserved.
However, when comparing two Identifier objects, or comparing a Identifier
object with a `str` object, the comparison will be case insensitive.
This case insensitivity includes the case convention conventions regarding
``[]``, ``{}``, ``|``, ``\\``, ``^`` and ``~`` described in RFC 2812.
"""
def __new__(cls, identifier):
# According to RFC2812, identifiers have to be in the ASCII range.
# However, I think it's best to let the IRCd determine that, and we'll
# just assume str. It won't hurt anything, and is more internally
# consistent. And who knows, maybe there's another use case for this
# weird case convention.
s = str.__new__(cls, identifier)
s._lowered = Identifier._lower(identifier)
return s
def lower(self):
"""Return the identifier converted to lower-case per RFC 2812."""
return self._lowered
@staticmethod
def _lower(identifier):
"""Returns `identifier` in lower case per RFC 2812."""
# The tilde replacement isn't needed for identifiers, but is for
# channels, which may be useful at some point in the future.
low = identifier.lower().replace('{', '[').replace('}', ']')
low = low.replace('|', '\\').replace('^', '~')
return low
def __repr__(self):
return "%s(%r)" % (
self.__class__.__name__,
self.__str__()
)
def __hash__(self):
return self._lowered.__hash__()
def __lt__(self, other):
if isinstance(other, Identifier):
return self._lowered < other._lowered
return self._lowered < Identifier._lower(other)
def __le__(self, other):
if isinstance(other, Identifier):
return self._lowered <= other._lowered
return self._lowered <= Identifier._lower(other)
def __gt__(self, other):
if isinstance(other, Identifier):
return self._lowered > other._lowered
return self._lowered > Identifier._lower(other)
def __ge__(self, other):
if isinstance(other, Identifier):
return self._lowered >= other._lowered
return self._lowered >= Identifier._lower(other)
def __eq__(self, other):
if isinstance(other, Identifier):
return self._lowered == other._lowered
return self._lowered == Identifier._lower(other)
def __ne__(self, other):
return not (self == other)
def is_nick(self):
"""Returns True if the Identifier is a nickname (as opposed to channel)
"""
return self and not self.startswith(_channel_prefixes)
class OutputRedirect(object):
"""Redirect te output to the terminal and a log file.
A simplified object used to write to both the terminal and a log file.
"""
def __init__(self, logpath, stderr=False, quiet=False):
"""Create an object which will to to a file and the terminal.
Create an object which will log to the file at ``logpath`` as well as
the terminal.
If ``stderr`` is given and true, it will write to stderr rather than
stdout.
If ``quiet`` is given and True, data will be written to the log file
only, but not the terminal.
"""
self.logpath = logpath
self.stderr = stderr
self.quiet = quiet
def write(self, string):
"""Write the given ``string`` to the logfile and terminal."""
if not self.quiet:
try:
if self.stderr:
sys.__stderr__.write(string)
else:
sys.__stdout__.write(string)
except:
pass
with codecs.open(self.logpath, 'ab', encoding="utf8",
errors='xmlcharrefreplace') as logfile:
try:
logfile.write(string)
except strDecodeError:
# we got an invalid string, safely encode it to utf-8
logfile.write(str(string, 'utf8', errors="replace"))
def flush(self):
if self.stderr:
sys.__stderr__.flush()
else:
sys.__stdout__.flush()
# These seems to trace back to when we thought we needed a try/except on prints,
# because it looked like that was why we were having problems. We'll drop it in
# 4.0^H^H^H5.0^H^H^H6.0^H^H^Hsome version when someone can be bothered.
@deprecated
def stdout(string):
print(string)
def stderr(string):
"""Print the given ``string`` to stderr.
This is equivalent to ``print >> sys.stderr, string``
"""
print(string, file=sys.stderr)
def check_pid(pid):
"""Check if a process is running with the given ``PID``.
*Availability: Only on POSIX systems*
Return ``True`` if there is a process running with the given ``PID``.
"""
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True
def get_hostmask_regex(mask):
"""Return a compiled `re.RegexObject` for an IRC hostmask"""
mask = re.escape(mask)
mask = mask.replace(r'\*', '.*')
return re.compile(mask + '$', re.I)
class SopelMemory(dict):
"""A simple thread-safe dict implementation.
*Availability: 4.0; available as ``Sopel.SopelMemory`` in 3.1.0 - 3.2.0*
In order to prevent exceptions when iterating over the values and changing
them at the same time from different threads, we use a blocking lock on
``__setitem__`` and ``contains``.
"""
def __init__(self, *args):
dict.__init__(self, *args)
self.lock = threading.Lock()
def __setitem__(self, key, value):
self.lock.acquire()
result = dict.__setitem__(self, key, value)
self.lock.release()
return result
def __contains__(self, key):
"""Check if a key is in the dict.
It locks it for writes when doing so.
"""
self.lock.acquire()
result = dict.__contains__(self, key)
self.lock.release()
return result
def contains(self, key):
"""Backwards compatability with 3.x, use `in` operator instead."""
return self.__contains__(key)
class SopelMemoryWithDefault(defaultdict):
"""Same as SopelMemory, but subclasses from collections.defaultdict."""
def __init__(self, *args):
defaultdict.__init__(self, *args)
self.lock = threading.Lock()
def __setitem__(self, key, value):
self.lock.acquire()
result = defaultdict.__setitem__(self, key, value)
self.lock.release()
return result
def __contains__(self, key):
"""Check if a key is in the dict.
It locks it for writes when doing so.
"""
self.lock.acquire()
result = defaultdict.__contains__(self, key)
self.lock.release()
return result
def contains(self, key):
"""Backwards compatability with 3.x, use `in` operator instead."""
return self.__contains__(key)
def configureHostMask(mask):
"""
Returns a valid hostmask based on user input.
"""
if mask == '*!*@*':
return mask
if re.match('^[^.@!/]+$', mask) is not None:
return '%s!*@*' % mask
if re.match('^[^@!]+$', mask) is not None:
return '*!*@%s' % mask
m = re.match('^([^!@]+)@$', mask)
if m is not None:
return '*!%s@*' % m.group(1)
m = re.match('^([^!@]+)@([^@!]+)$', mask)
if m is not None:
return '*!%s@%s' % (m.group(1), m.group(2))
m = re.match('^([^!@]+)!(^[!@]+)@?$', mask)
if m is not None:
return '%s!%s@*' % (m.group(1), m.group(2))
return ''