#!/usr/bin/env python3 """ An IRC bot which tracks new alerts and invasions for Warframe and reports them to channel. Only intended to be used in one channel at a time. """ import os import json import time import functools import threading import configparser import requests from twisted.internet import protocol, reactor from twisted.words.protocols import irc URI = "http://content.warframe.com/dynamic/worldState.php" class ClockThread(threading.Thread): """ A thread class which acts like a clock for signaling the bot to check the world state every minute. """ def __init__(self, bot): threading.Thread.__init__(self) self.bot = bot def run(self): while not stop.is_set(): self.bot.checkNewAlerts() stop.wait(60) class WarBot(irc.IRCClient): def __init__(self, config): self.config = config self.nickname = self.config["nickname"] self.username = self.config["ident"] self.channel = self.config["channel"] with open(os.path.join("static", "languages.json"), "r") as file: self.languages = json.loads(file.read()) with open(os.path.join("static", "missionTypes.json"), "r") as file: self.missionTypes = json.loads(file.read()) with open(os.path.join("static", "solNodes.json"), "r") as file: self.solNodes = json.loads(file.read()) self.clockThread = ClockThread(self) self.clockThread.start() self.alert_ids = [] def stillConnected(self): """Returns true if the bot is still connected to the server.""" if self._heartbeat: return True else: return False def clock(self): """ A continous loop which calls the processing functions once per minute. Should only be ran in a separate thread. """ while not stop.is_set(): self.checkNewAlerts() stop.wait(60) def checkNewAlerts(self): """ Checks the world state for new alerts or invasions. """ while not self.stillConnected(): # startup reasons time.sleep(1) try: res = requests.get(URI) res.raise_for_status() except requests.exceptions.ConnectionError: return data = res.json() alert_ids = [alert["_id"]["$oid"] for alert in data["Alerts"]] new_alerts = [a for a in alert_ids if a not in self.alert_ids] self.alert_ids += new_alerts for alert in data["Alerts"]: if alert["_id"]["$oid"] in new_alerts: self.process_alert(alert) expired_alerts = [a for a in self.alert_ids if a not in alert_ids] for alert_id in expired_alerts: self.alert_ids.remove(alert_id) def process_alert(self, alert): """ Processes the provided alert and sends a message to channel. """ info = alert["MissionInfo"] node = self.solNodes[info["location"]] mission_type = self.missionTypes[info["missionType"]]["value"] credits = info["missionReward"].get("credits") items = info["missionReward"].get("items", []) if items: items = [self.languages[item.lower()]["value"] for item in items] cItems = info["missionReward"].get("countedItems") if cItems: for item in cItems: itemStr = f"({item['ItemCount']}) " itemStr += self.languages[item["ItemType"].lower()]["value"] items.append(itemStr) expire = int(alert["Expiry"]["$date"]["$numberLong"]) expire_diff = int(expire/1000 - time.time()) expire_t = time.localtime(expire) message = "\x0300[\x0305ALERT\x0300] " \ + f"\x0310Location\x03: \x0312{node['value']}\x03 | " \ + f"\x0310Mission Type\x03: \x0312{mission_type}\x03 | " \ + f"\x0310Expiry\x03: \x0308{expire_diff // 3600}h" \ + f"{expire_diff % 3600 // 60}m\x03 " \ + f"(\x0308{time.strftime('%H:%M', expire_t)}\x03)" \ + f" | \x0310Credits\x03: \x0312{credits}\x03" if items: message += f" | \x0310Items: \x0312{', '.join(items)}\x03" self.msg(self.channel, message) def privmsg(self, user, channel, message): """ Called when the bot receives a PRIVMSG, which can come from channels or users alike. """ pass def joined(self, channel): """Called when the bot joins a new channel.""" print("Joined", channel) def signedOn(self): """Called when the bot successfully connects to the server.""" print("Signed on as", self.nickname) self.mode(self.nickname, True, "B") # set +B on self self.join(self.channel) def nickChanged(self, nick): """Called when my nick has been changed.""" print("Nick changed to", nick) class WarBotFactory(protocol.ReconnectingClientFactory): # black magic going on here protocol = property(lambda s: functools.partial(WarBot, s.config)) def __init__(self, config): self.config = config stop = threading.Event() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser( description="Downloads new torrents from a torrent tracker's IRC " \ + "announce channel.") parser.add_argument( "-c", "--config", default="config.cfg", help="Specify an alternate config file to use.") args = parser.parse_args() config = configparser.ConfigParser() config.read(args.config) config = config[config.sections()[0]] #only use the first section server = config["server"] port = config.getint("port") print("Connecting to:", server) reactor.connectTCP(server, port, WarBotFactory(config)) reactor.addSystemEventTrigger('before','shutdown', stop.set) reactor.run()