#!/usr/bin/env python3 """ An IRC bot which tracks new alerts and invasions for Warframe and reports them to channel. Only intended to be used in one channel at a time. """ import os import json import time import functools import threading import configparser import requests from twisted.internet import protocol, reactor from twisted.words.protocols import irc URI = "http://content.warframe.com/dynamic/worldState.php" DATA_LANG = "https://raw.githubusercontent.com/WFCD/warframe-worldstate-data/master/data/languages.json" DATA_MISS = "https://raw.githubusercontent.com/WFCD/warframe-worldstate-data/master/data/missionTypes.json" DATA_SOL = "https://raw.githubusercontent.com/WFCD/warframe-worldstate-data/master/data/solNodes.json" CETUS_EPOCH = 1550435808 class ClockThread(threading.Thread): """ A thread class which acts like a clock for signaling the bot to check the world state every minute. """ def __init__(self, bot): threading.Thread.__init__(self) self.bot = bot def run(self): while not stop.is_set(): self.bot.checkNewAlerts() stop.wait(60) class WarBot(irc.IRCClient): def __init__(self, config): self.config = config self.nickname = self.config["nickname"] self.username = self.config["ident"] self.channel = self.config["channel"] with open(os.path.join("static", "languages.json"), "r") as file: self.languages = json.loads(file.read()) with open(os.path.join("static", "missionTypes.json"), "r") as file: self.missionTypes = json.loads(file.read()) with open(os.path.join("static", "solNodes.json"), "r") as file: self.solNodes = json.loads(file.read()) self.clockThread = ClockThread(self) self.clockThread.start() self.alert_ids = [] self.subs = {} def stillConnected(self): """Returns true if the bot is still connected to the server.""" if self._heartbeat: return True else: return False def checkNewAlerts(self, channel=None): """ Checks the world state for new alerts or invasions. """ if not channel: channel = self.channel while not self.stillConnected(): # startup reasons time.sleep(1) try: res = requests.get(URI) res.raise_for_status() except requests.exceptions.ConnectionError: return self.msg(channel, f"Connection Error") except requests.exceptions.HTTPError: return self.msg(channel, f"HTTP Error: {res.status_code}") except Exception as e: return self.msg(channel, f"Error: {e}") data = res.json() alert_ids = [alert["_id"]["$oid"] for alert in data["Alerts"]] new_alerts = [a for a in alert_ids if a not in self.alert_ids] self.alert_ids += new_alerts for alert in data["Alerts"]: if alert["_id"]["$oid"] in new_alerts: self.process_alert(alert, channel) expired_alerts = [a for a in self.alert_ids if a not in alert_ids] for alert_id in expired_alerts: self.alert_ids.remove(alert_id) def process_alert(self, alert, channel): """ Processes the provided alert and sends a message to channel. """ info = alert["MissionInfo"] node = self.solNodes[info["location"]] mission_type = self.missionTypes[info["missionType"]]["value"] minLvl = info["minEnemyLevel"] maxLvl = info["maxEnemyLevel"] credits = info["missionReward"].get("credits") items_raw = info["missionReward"].get("items", []) items = [] for item in items_raw: try: items.append(self.languages[item.lower()]["value"]) except KeyError: items.append(item) cItems = info["missionReward"].get("countedItems", []) for item in cItems: itemStr = f"({item['ItemCount']}) " try: itemStr += self.languages[item["ItemType"].lower()]["value"] except KeyError: itemStr += item["ItemType"] items.append(itemStr) expire = int(alert["Expiry"]["$date"]["$numberLong"]) expire = int(expire/1000) expire_diff = expire - int(time.time()) expire_t = time.localtime(expire) message = "\x0300[\x0305ALERT\x0300] " \ + f"\x0310Location\x03: \x0312{node['value']}\x03 | " \ + f"\x0310Mission Type\x03: \x0312{mission_type}\x03 " \ + f"(\x0307{minLvl}\x03-\x0307{maxLvl}\x03) | " \ + f"\x0310Expiry\x03: \x0308{expire_diff // 3600}h" \ + f"{expire_diff % 3600 // 60}m\x03 " \ + f"(\x0308{time.strftime('%H:%M', expire_t)}\x03) | " \ + f"\x0310Credits\x03: \x0312{credits}\x03" if items: message += f" | \x0310Items: \x0312{', '.join(items)}\x03" self.msg(channel, message) for user, subs in self.subs.items(): for sub in subs: if sub.lower() in message.lower(): self.notice(user.partition('!')[0], message) def privmsg(self, user, channel, message): """ Called when the bot receives a PRIVMSG, which can come from channels or users alike. """ if message == ".alerts": self.alert_ids = [] self.checkNewAlerts(channel) return if message.startswith(".subscribe "): sub = message.replace(".subscribe ", "") if not self.subs.get(user): self.subs[user] = [] self.subs[user].append(sub) msg = "\x0310Current subscriptions\x03: [\x0308" msg += "\x03, \x0308".join(self.subs[user]) msg += "\x03]" self.notice(user.partition('!')[0], msg) return if message.startswith(".unsubscribe "): sub = message.replace(".unsubscribe ", "") if not self.subs.get(user): return if not sub in self.subs[user]: return self.subs[user].remove(sub) msg = "\x0310Current subscriptions\x03: [\x0308" msg += "\x03, \x0308".join(self.subs[user]) msg += "\x03]" self.notice(user.partition('!')[0], msg) return if message.startswith(".update_data"): try: res = requests.get(DATA_LANG, verify=True) res.raise_for_status() self.languages = res.json() res = requests.get(DATA_MISS, verify=True) res.raise_for_status() self.missionTypes = res.json() res = requests.get(DATA_SOL, verify=True) res.raise_for_status() self.solNodes = res.json() except requests.RequestException: self.msg(channel, "Error: Could not update data.") with open(os.path.join("static", "languages.json"), "w") as file: file.write(json.dumps(self.languages)) with open(os.path.join("static", "missionTypes.json"),"w") as file: file.write(json.dumps(self.missionTypes)) with open(os.path.join("static", "solNodes.json"), "w") as file: file.write(json.dumps(self.solNodes)) self.msg(channel, "Data stores updated.") return if message.startswith(".cetus"): res = requests.get(URI, verify=True) res.raise_for_status() data = res.json() delta = (data['Time'] - CETUS_EPOCH) % (150*60) if delta > (100*60): state = "Night" next_state = "Day" eta = (150*60) - delta else: state = "Day" next_state = "Night" eta = (100*60) - delta if eta < 60: eta = str(eta) + 's' else: eta = str(eta//60) + 'm' msg = f"\x0310Cetus time\x0300: \x0312{state}\x0300 | " msg += f"\x0310Time until \x0312{next_state}\x0300: " msg += f"\x0308{eta}" self.msg(channel, msg) return def joined(self, channel): """Called when the bot joins a new channel.""" print("Joined", channel) def signedOn(self): """Called when the bot successfully connects to the server.""" print("Signed on as", self.nickname) self.mode(self.nickname, True, "B") # set +B on self self.join(self.channel) def nickChanged(self, nick): """Called when my nick has been changed.""" print("Nick changed to", nick) class WarBotFactory(protocol.ReconnectingClientFactory): # black magic going on here protocol = property(lambda s: functools.partial(WarBot, s.config)) def __init__(self, config): self.config = config stop = threading.Event() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser( description="Downloads new torrents from a torrent tracker's IRC " \ + "announce channel.") parser.add_argument( "-c", "--config", default="config.cfg", help="Specify an alternate config file to use.") args = parser.parse_args() config = configparser.ConfigParser() config.read(args.config) config = config[config.sections()[0]] #only use the first section server = config["server"] port = config.getint("port") print("Connecting to:", server) reactor.connectTCP(server, port, WarBotFactory(config)) reactor.addSystemEventTrigger('before','shutdown', stop.set) reactor.run()