199 lines
6.2 KiB
Python
199 lines
6.2 KiB
Python
|
# coding=utf-8
|
||
|
"""
|
||
|
safety.py - Alerts about malicious URLs
|
||
|
Copyright © 2014, Elad Alfassa, <elad@fedoraproject.org>
|
||
|
Licensed under the Eiffel Forum License 2.
|
||
|
|
||
|
This module uses virustotal.com
|
||
|
"""
|
||
|
from __future__ import unicode_literals, absolute_import, print_function, division
|
||
|
|
||
|
import requests
|
||
|
from config.types import StaticSection, ValidatedAttribute, ListAttribute
|
||
|
from formatting import color, bold
|
||
|
from logger import get_logger
|
||
|
from module import OP
|
||
|
import tools
|
||
|
import sys
|
||
|
import json
|
||
|
import time
|
||
|
import os.path
|
||
|
import re
|
||
|
import module
|
||
|
|
||
|
if sys.version_info.major > 2:
|
||
|
unicode = str
|
||
|
from urllib.request import urlretrieve
|
||
|
from urllib.parse import urlparse
|
||
|
else:
|
||
|
from urllib import urlretrieve
|
||
|
from urlparse import urlparse
|
||
|
|
||
|
LOGGER = get_logger(__name__)
|
||
|
|
||
|
vt_base_api_url = 'https://www.virustotal.com/vtapi/v2/url/'
|
||
|
malware_domains = set()
|
||
|
known_good = []
|
||
|
|
||
|
|
||
|
class SafetySection(StaticSection):
|
||
|
enabled_by_default = ValidatedAttribute('enabled_by_default', bool, default=True)
|
||
|
"""Enable URL safety in all channels where it isn't explicitly disabled."""
|
||
|
known_good = ListAttribute('known_good')
|
||
|
"""List of "known good" domains to ignore."""
|
||
|
vt_api_key = ValidatedAttribute('vt_api_key')
|
||
|
"""Optional VirusTotal API key."""
|
||
|
|
||
|
|
||
|
def configure(config):
|
||
|
config.define_section('safety', SafetySection)
|
||
|
config.safety.configure_setting(
|
||
|
'enabled_by_default',
|
||
|
"Enable URL safety in channels that don't specifically disable it?",
|
||
|
)
|
||
|
config.safety.configure_setting(
|
||
|
'known_good',
|
||
|
'Enter any domains to whitelist',
|
||
|
)
|
||
|
config.safety.configure_setting(
|
||
|
'vt_api_key',
|
||
|
"Optionally, enter a VirusTotal API key to improve malicious URL "
|
||
|
"protection.\nOtherwise, only the Malwarebytes DB will be used."
|
||
|
)
|
||
|
|
||
|
|
||
|
def setup(bot):
|
||
|
bot.config.define_section('safety', SafetySection)
|
||
|
|
||
|
bot.memory['safety_cache'] = tools.SopelMemory()
|
||
|
for item in bot.config.safety.known_good:
|
||
|
known_good.append(re.compile(item, re.I))
|
||
|
|
||
|
loc = os.path.join(bot.config.homedir, 'malwaredomains.txt')
|
||
|
if os.path.isfile(loc):
|
||
|
if os.path.getmtime(loc) < time.time() - 24 * 60 * 60 * 7:
|
||
|
# File exists but older than one week, update
|
||
|
_download_malwaredomains_db(loc)
|
||
|
else:
|
||
|
_download_malwaredomains_db(loc)
|
||
|
with open(loc, 'r') as f:
|
||
|
for line in f:
|
||
|
clean_line = unicode(line).strip().lower()
|
||
|
if clean_line != '':
|
||
|
malware_domains.add(clean_line)
|
||
|
|
||
|
|
||
|
def _download_malwaredomains_db(path):
|
||
|
print('Downloading malwaredomains db...')
|
||
|
urlretrieve('http://mirror1.malwaredomains.com/files/justdomains', path)
|
||
|
|
||
|
|
||
|
@module.rule('(?u).*(https?://\S+).*')
|
||
|
@module.priority('high')
|
||
|
def url_handler(bot, trigger):
|
||
|
""" Check for malicious URLs """
|
||
|
check = True # Enable URL checking
|
||
|
strict = False # Strict mode: kick on malicious URL
|
||
|
positives = 0 # Number of engines saying it's malicious
|
||
|
total = 0 # Number of total engines
|
||
|
use_vt = True # Use VirusTotal
|
||
|
check = bot.config.safety.enabled_by_default
|
||
|
if check is None:
|
||
|
# If not set, assume default
|
||
|
check = True
|
||
|
# DB overrides config:
|
||
|
setting = bot.db.get_channel_value(trigger.sender, 'safety')
|
||
|
if setting is not None:
|
||
|
if setting == 'off':
|
||
|
return # Not checking
|
||
|
elif setting in ['on', 'strict', 'local', 'local strict']:
|
||
|
check = True
|
||
|
if setting == 'strict' or setting == 'local strict':
|
||
|
strict = True
|
||
|
if setting == 'local' or setting == 'local strict':
|
||
|
use_vt = False
|
||
|
|
||
|
if not check:
|
||
|
return # Not overriden by DB, configured default off
|
||
|
|
||
|
netloc = urlparse(trigger.group(1)).netloc
|
||
|
if any(regex.search(netloc) for regex in known_good):
|
||
|
return # Whitelisted
|
||
|
|
||
|
apikey = bot.config.safety.vt_api_key
|
||
|
try:
|
||
|
if apikey is not None and use_vt:
|
||
|
payload = {'resource': unicode(trigger),
|
||
|
'apikey': apikey,
|
||
|
'scan': '1'}
|
||
|
|
||
|
if trigger not in bot.memory['safety_cache']:
|
||
|
result = requests.post(vt_base_api_url + 'report', payload)
|
||
|
if sys.version_info.major > 2:
|
||
|
result = result.decode('utf-8')
|
||
|
result = json.loads(result)
|
||
|
age = time.time()
|
||
|
data = {'positives': result['positives'],
|
||
|
'total': result['total'],
|
||
|
'age': age}
|
||
|
bot.memory['safety_cache'][trigger] = data
|
||
|
if len(bot.memory['safety_cache']) > 1024:
|
||
|
_clean_cache(bot)
|
||
|
else:
|
||
|
print('using cache')
|
||
|
result = bot.memory['safety_cache'][trigger]
|
||
|
positives = result['positives']
|
||
|
total = result['total']
|
||
|
except Exception:
|
||
|
LOGGER.debug('Error from checking URL with VT.', exc_info=True)
|
||
|
pass # Ignoring exceptions with VT so MalwareDomains will always work
|
||
|
|
||
|
if unicode(netloc).lower() in malware_domains:
|
||
|
# malwaredomains is more trustworthy than some VT engines
|
||
|
# therefor it gets a weight of 10 engines when calculating confidence
|
||
|
positives += 10
|
||
|
total += 10
|
||
|
|
||
|
if positives > 1:
|
||
|
# Possibly malicious URL detected!
|
||
|
confidence = '{}%'.format(round((positives / total) * 100))
|
||
|
msg = 'link posted by %s is possibly malicious ' % bold(trigger.nick)
|
||
|
msg += '(confidence %s - %s/%s)' % (confidence, positives, total)
|
||
|
bot.say('[' + bold(color('WARNING', 'red')) + '] ' + msg)
|
||
|
if strict:
|
||
|
bot.write(['KICK', trigger.sender, trigger.nick,
|
||
|
'Posted a malicious link'])
|
||
|
|
||
|
|
||
|
@module.commands('safety')
|
||
|
def toggle_safety(bot, trigger):
|
||
|
""" Set safety setting for channel """
|
||
|
if not trigger.admin and bot.privileges[trigger.sender][trigger.nick] < OP:
|
||
|
bot.reply('Only channel operators can change safety settings')
|
||
|
return
|
||
|
allowed_states = ['strict', 'on', 'off', 'local', 'local strict']
|
||
|
if not trigger.group(2) or trigger.group(2).lower() not in allowed_states:
|
||
|
options = ' / '.join(allowed_states)
|
||
|
bot.reply('Available options: %s' % options)
|
||
|
return
|
||
|
|
||
|
channel = trigger.sender.lower()
|
||
|
bot.db.set_channel_value(channel, 'safety', trigger.group(2).lower())
|
||
|
bot.reply('Safety is now set to "%s" on this channel' % trigger.group(2))
|
||
|
|
||
|
|
||
|
# Clean the cache every day, also when > 1024 entries
|
||
|
@module.interval(24 * 60 * 60)
|
||
|
def _clean_cache(bot):
|
||
|
""" Cleanup old entries in URL cache """
|
||
|
# TODO probably should be using locks here, to make sure stuff doesn't
|
||
|
# explode
|
||
|
oldest_key_age = 0
|
||
|
oldest_key = ''
|
||
|
for key, data in tools.iteritems(bot.memory['safety_cache']):
|
||
|
if data['age'] > oldest_key_age:
|
||
|
oldest_key_age = data['age']
|
||
|
oldest_key = key
|
||
|
if oldest_key in bot.memory['safety_cache']:
|
||
|
del bot.memory['safety_cache'][oldest_key]
|