196 lines
5.1 KiB
Python
Executable File
196 lines
5.1 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
An IRC bot which tracks new alerts and invasions for Warframe and reports
|
|
them to channel. Only intended to be used in one channel at a time.
|
|
"""
|
|
import os
|
|
import json
|
|
import time
|
|
import functools
|
|
import threading
|
|
import configparser
|
|
|
|
import requests
|
|
from twisted.internet import protocol, reactor
|
|
from twisted.words.protocols import irc
|
|
|
|
URI = "http://content.warframe.com/dynamic/worldState.php"
|
|
|
|
|
|
class ClockThread(threading.Thread):
|
|
"""
|
|
A thread class which acts like a clock for signaling the bot to check
|
|
the world state every minute.
|
|
"""
|
|
def __init__(self, bot):
|
|
threading.Thread.__init__(self)
|
|
self.bot = bot
|
|
|
|
|
|
def run(self):
|
|
while not stop.is_set():
|
|
self.bot.checkNewAlerts()
|
|
stop.wait(60)
|
|
|
|
|
|
class WarBot(irc.IRCClient):
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.nickname = self.config["nickname"]
|
|
self.username = self.config["ident"]
|
|
self.channel = self.config["channel"]
|
|
|
|
with open(os.path.join("static", "languages.json"), "r") as file:
|
|
self.languages = json.loads(file.read())
|
|
|
|
with open(os.path.join("static", "missionTypes.json"), "r") as file:
|
|
self.missionTypes = json.loads(file.read())
|
|
|
|
with open(os.path.join("static", "solNodes.json"), "r") as file:
|
|
self.solNodes = json.loads(file.read())
|
|
|
|
self.clockThread = ClockThread(self)
|
|
self.clockThread.start()
|
|
|
|
self.alert_ids = []
|
|
|
|
|
|
def stillConnected(self):
|
|
"""Returns true if the bot is still connected to the server."""
|
|
if self._heartbeat:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
|
|
def clock(self):
|
|
"""
|
|
A continous loop which calls the processing functions once per
|
|
minute. Should only be ran in a separate thread.
|
|
"""
|
|
while not stop.is_set():
|
|
self.checkNewAlerts()
|
|
stop.wait(60)
|
|
|
|
|
|
def checkNewAlerts(self):
|
|
"""
|
|
Checks the world state for new alerts or invasions.
|
|
"""
|
|
while not self.stillConnected(): # startup reasons
|
|
time.sleep(1)
|
|
|
|
try:
|
|
res = requests.get(URI)
|
|
res.raise_for_status()
|
|
except requests.exceptions.ConnectionError:
|
|
return
|
|
data = res.json()
|
|
|
|
alert_ids = [alert["_id"]["$oid"] for alert in data["Alerts"]]
|
|
|
|
new_alerts = [a for a in alert_ids if a not in self.alert_ids]
|
|
self.alert_ids += new_alerts
|
|
for alert in data["Alerts"]:
|
|
if alert["_id"]["$oid"] in new_alerts:
|
|
self.process_alert(alert)
|
|
|
|
expired_alerts = [a for a in self.alert_ids if a not in alert_ids]
|
|
for alert_id in expired_alerts:
|
|
self.alert_ids.remove(alert_id)
|
|
|
|
|
|
def process_alert(self, alert):
|
|
"""
|
|
Processes the provided alert and sends a message to channel.
|
|
"""
|
|
info = alert["MissionInfo"]
|
|
node = self.solNodes[info["location"]]
|
|
mission_type = self.missionTypes[info["missionType"]]["value"]
|
|
credits = info["missionReward"].get("credits")
|
|
items = info["missionReward"].get("items", [])
|
|
if items:
|
|
items = [self.languages[item.lower()]["value"] for item in items]
|
|
cItems = info["missionReward"].get("countedItems")
|
|
if cItems:
|
|
for item in cItems:
|
|
itemStr = f"({item['ItemCount']}) "
|
|
itemStr += self.languages[item["ItemType"].lower()]["value"]
|
|
items.append(itemStr)
|
|
|
|
expire = int(alert["Expiry"]["$date"]["$numberLong"])
|
|
expire = int(expire/1000)
|
|
expire_diff = expire - int(time.time())
|
|
expire_t = time.localtime(expire)
|
|
|
|
message = "\x0300[\x0305ALERT\x0300] " \
|
|
+ f"\x0310Location\x03: \x0312{node['value']}\x03 | " \
|
|
+ f"\x0310Mission Type\x03: \x0312{mission_type}\x03 | " \
|
|
+ f"\x0310Expiry\x03: \x0308{expire_diff // 3600}h" \
|
|
+ f"{expire_diff % 3600 // 60}m\x03 " \
|
|
+ f"(\x0308{time.strftime('%H:%M', expire_t)}\x03)" \
|
|
+ f" | \x0310Credits\x03: \x0312{credits}\x03"
|
|
if items:
|
|
message += f" | \x0310Items: \x0312{', '.join(items)}\x03"
|
|
|
|
self.msg(self.channel, message)
|
|
|
|
|
|
def privmsg(self, user, channel, message):
|
|
"""
|
|
Called when the bot receives a PRIVMSG, which can come from channels
|
|
or users alike.
|
|
"""
|
|
pass
|
|
|
|
|
|
def joined(self, channel):
|
|
"""Called when the bot joins a new channel."""
|
|
print("Joined", channel)
|
|
|
|
|
|
def signedOn(self):
|
|
"""Called when the bot successfully connects to the server."""
|
|
print("Signed on as", self.nickname)
|
|
self.mode(self.nickname, True, "B") # set +B on self
|
|
self.join(self.channel)
|
|
|
|
|
|
def nickChanged(self, nick):
|
|
"""Called when my nick has been changed."""
|
|
print("Nick changed to", nick)
|
|
|
|
|
|
class WarBotFactory(protocol.ReconnectingClientFactory):
|
|
# black magic going on here
|
|
protocol = property(lambda s: functools.partial(WarBot, s.config))
|
|
|
|
def __init__(self, config):
|
|
self.config = config
|
|
|
|
|
|
stop = threading.Event()
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Downloads new torrents from a torrent tracker's IRC " \
|
|
+ "announce channel.")
|
|
parser.add_argument(
|
|
"-c",
|
|
"--config",
|
|
default="config.cfg",
|
|
help="Specify an alternate config file to use.")
|
|
args = parser.parse_args()
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read(args.config)
|
|
config = config[config.sections()[0]] #only use the first section
|
|
|
|
server = config["server"]
|
|
port = config.getint("port")
|
|
print("Connecting to:", server)
|
|
reactor.connectTCP(server, port, WarBotFactory(config))
|
|
reactor.addSystemEventTrigger('before','shutdown', stop.set)
|
|
reactor.run()
|