WarBot/warbot.py

268 lines
7.5 KiB
Python
Executable File

#!/usr/bin/env python3
"""
An IRC bot which tracks new alerts and invasions for Warframe and reports
them to channel. Only intended to be used in one channel at a time.
"""
import os
import json
import time
import functools
import threading
import configparser
import requests
from twisted.internet import protocol, reactor
from twisted.words.protocols import irc
URI = "http://content.warframe.com/dynamic/worldState.php"
DATA_LANG = "https://raw.githubusercontent.com/WFCD/warframe-worldstate-data/master/data/languages.json"
DATA_MISS = "https://raw.githubusercontent.com/WFCD/warframe-worldstate-data/master/data/missionTypes.json"
DATA_SOL = "https://raw.githubusercontent.com/WFCD/warframe-worldstate-data/master/data/solNodes.json"
class ClockThread(threading.Thread):
"""
A thread class which acts like a clock for signaling the bot to check
the world state every minute.
"""
def __init__(self, bot):
threading.Thread.__init__(self)
self.bot = bot
def run(self):
while not stop.is_set():
self.bot.checkNewAlerts()
stop.wait(60)
class WarBot(irc.IRCClient):
def __init__(self, config):
self.config = config
self.nickname = self.config["nickname"]
self.username = self.config["ident"]
self.channel = self.config["channel"]
with open(os.path.join("static", "languages.json"), "r") as file:
self.languages = json.loads(file.read())
with open(os.path.join("static", "missionTypes.json"), "r") as file:
self.missionTypes = json.loads(file.read())
with open(os.path.join("static", "solNodes.json"), "r") as file:
self.solNodes = json.loads(file.read())
self.clockThread = ClockThread(self)
self.clockThread.start()
self.alert_ids = []
self.subs = {}
def stillConnected(self):
"""Returns true if the bot is still connected to the server."""
if self._heartbeat:
return True
else:
return False
def checkNewAlerts(self, channel=None):
"""
Checks the world state for new alerts or invasions.
"""
if not channel:
channel = self.channel
while not self.stillConnected(): # startup reasons
time.sleep(1)
try:
res = requests.get(URI)
res.raise_for_status()
except requests.exceptions.ConnectionError:
return self.msg(channel, f"Connection Error")
except requests.exceptions.HTTPError:
return self.msg(channel, f"HTTP Error: {res.status_code}")
except Exception as e:
return self.msg(channel, f"Error: {e}")
data = res.json()
alert_ids = [alert["_id"]["$oid"] for alert in data["Alerts"]]
new_alerts = [a for a in alert_ids if a not in self.alert_ids]
self.alert_ids += new_alerts
for alert in data["Alerts"]:
if alert["_id"]["$oid"] in new_alerts:
self.process_alert(alert, channel)
expired_alerts = [a for a in self.alert_ids if a not in alert_ids]
for alert_id in expired_alerts:
self.alert_ids.remove(alert_id)
def process_alert(self, alert, channel):
"""
Processes the provided alert and sends a message to channel.
"""
info = alert["MissionInfo"]
node = self.solNodes[info["location"]]
mission_type = self.missionTypes[info["missionType"]]["value"]
minLvl = info["minEnemyLevel"]
maxLvl = info["maxEnemyLevel"]
credits = info["missionReward"].get("credits")
items_raw = info["missionReward"].get("items", [])
items = []
for item in items_raw:
try:
items.append(self.languages[item.lower()]["value"])
except KeyError:
items.append(item)
cItems = info["missionReward"].get("countedItems", [])
for item in cItems:
itemStr = f"({item['ItemCount']}) "
try:
itemStr += self.languages[item["ItemType"].lower()]["value"]
except KeyError:
itemStr += item["ItemType"]
items.append(itemStr)
expire = int(alert["Expiry"]["$date"]["$numberLong"])
expire = int(expire/1000)
expire_diff = expire - int(time.time())
expire_t = time.localtime(expire)
message = "\x0300[\x0305ALERT\x0300] " \
+ f"\x0310Location\x03: \x0312{node['value']}\x03 | " \
+ f"\x0310Mission Type\x03: \x0312{mission_type}\x03 " \
+ f"(\x0307{minLvl}\x03-\x0307{maxLvl}\x03) | " \
+ f"\x0310Expiry\x03: \x0308{expire_diff // 3600}h" \
+ f"{expire_diff % 3600 // 60}m\x03 " \
+ f"(\x0308{time.strftime('%H:%M', expire_t)}\x03) | " \
+ f"\x0310Credits\x03: \x0312{credits}\x03"
if items:
message += f" | \x0310Items: \x0312{', '.join(items)}\x03"
self.msg(channel, message)
for user, subs in self.subs.items():
for sub in subs:
if sub.lower() in message.lower():
self.notice(user.partition('!')[0], message)
def privmsg(self, user, channel, message):
"""
Called when the bot receives a PRIVMSG, which can come from channels
or users alike.
"""
if message == ".alerts":
self.alert_ids = []
self.checkNewAlerts(channel)
return
if message.startswith(".subscribe "):
sub = message.replace(".subscribe ", "")
if not self.subs.get(user):
self.subs[user] = []
self.subs[user].append(sub)
msg = "\x0310Current subscriptions\x03: [\x0308"
msg += "\x03, \x0308".join(self.subs[user])
msg += "\x03]"
self.notice(user.partition('!')[0], msg)
return
if message.startswith(".unsubscribe "):
sub = message.replace(".unsubscribe ", "")
if not self.subs.get(user):
return
if not sub in self.subs[user]:
return
self.subs[user].remove(sub)
msg = "\x0310Current subscriptions\x03: [\x0308"
msg += "\x03, \x0308".join(self.subs[user])
msg += "\x03]"
self.notice(user.partition('!')[0], msg)
return
if message.startswith(".update_data"):
try:
res = requests.get(DATA_LANG, verify=True)
res.raise_for_status()
self.languages = res.json()
res = requests.get(DATA_MISS, verify=True)
res.raise_for_status()
self.missionTypes = res.json()
res = requests.get(DATA_SOL, verify=True)
res.raise_for_status()
self.solNodes = res.json()
except requests.RequestException:
self.msg(channel, "Error: Could not update data.")
with open(os.path.join("static", "languages.json"), "w") as file:
file.write(json.dumps(self.languages))
with open(os.path.join("static", "missionTypes.json"),"w") as file:
file.write(json.dumps(self.missionTypes))
with open(os.path.join("static", "solNodes.json"), "w") as file:
file.write(json.dumps(self.solNodes))
self.msg(channel, "Data stores updated.")
return
def joined(self, channel):
"""Called when the bot joins a new channel."""
print("Joined", channel)
def signedOn(self):
"""Called when the bot successfully connects to the server."""
print("Signed on as", self.nickname)
self.mode(self.nickname, True, "B") # set +B on self
self.join(self.channel)
def nickChanged(self, nick):
"""Called when my nick has been changed."""
print("Nick changed to", nick)
class WarBotFactory(protocol.ReconnectingClientFactory):
# black magic going on here
protocol = property(lambda s: functools.partial(WarBot, s.config))
def __init__(self, config):
self.config = config
stop = threading.Event()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Downloads new torrents from a torrent tracker's IRC " \
+ "announce channel.")
parser.add_argument(
"-c",
"--config",
default="config.cfg",
help="Specify an alternate config file to use.")
args = parser.parse_args()
config = configparser.ConfigParser()
config.read(args.config)
config = config[config.sections()[0]] #only use the first section
server = config["server"]
port = config.getint("port")
print("Connecting to:", server)
reactor.connectTCP(server, port, WarBotFactory(config))
reactor.addSystemEventTrigger('before','shutdown', stop.set)
reactor.run()