139 lines
3.8 KiB
Python
139 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tools for updating and extracting data from the Redis database.
|
|
"""
|
|
import re
|
|
import json
|
|
import time
|
|
|
|
import bs4
|
|
import redis
|
|
import requests
|
|
|
|
import config
|
|
|
|
_r = redis.Redis(
|
|
config.REDIS_HOST,
|
|
config.REDIS_PORT,
|
|
)
|
|
|
|
def scrape_feed(feed_url):
|
|
"""Scrapes an RSS feed and extract all relevant data from it."""
|
|
headers = {"User-Agent": "Feed me RSS"}
|
|
res = requests.get(feed_url, headers=headers, timeout=10)
|
|
res.raise_for_status()
|
|
|
|
encoding = re.search(r'encoding="(.*)"', res.text)
|
|
if encoding:
|
|
encoding = encoding.group(1)
|
|
else:
|
|
encoding = 'utf-8'
|
|
soup = bs4.BeautifulSoup(res.content, 'xml', from_encoding=encoding)
|
|
|
|
if "factorio.com" in feed_url: # because screw making it generic
|
|
return scrape_factorio(soup)
|
|
elif "zombieknight" in feed_url:
|
|
return scrape_zombie_knight(soup)
|
|
|
|
meta = {}
|
|
meta['title'] = soup.title.text
|
|
meta['html_url'] = soup.find("link", href="").text
|
|
meta['html_url'] = meta['html_url'].replace('http://', 'https://')
|
|
meta['description'] = soup.description.text
|
|
|
|
entries = soup.find_all('item')
|
|
feed_entries = []
|
|
for entry in entries[:20]:
|
|
entry_dict = {}
|
|
entry_dict['title'] = entry.title.text
|
|
entry_dict['link'] = entry.link.text
|
|
entry_dict['link'] = entry_dict['link'].replace('http://', 'https://')
|
|
try:
|
|
date = entry.pubDate.text
|
|
try:
|
|
date = time.strptime(date, '%a, %d %b %Y %H:%M:%S %z')
|
|
except ValueError:
|
|
date = time.strptime(date, '%a, %d %b %Y %H:%M:%S %Z')
|
|
entry_dict['date'] = time.strftime('%Y-%m-%d', date)
|
|
except AttributeError:
|
|
entry_dict['date'] = ""
|
|
entry_dict['description'] = entry.description.text[:200]
|
|
# TODO: html sanitation
|
|
feed_entries.append(entry_dict)
|
|
feed = {'meta': meta, 'entries': feed_entries}
|
|
return feed
|
|
|
|
|
|
def scrape_zombie_knight(soup):
|
|
"""Handles the special case that is Zombie Knight Saga."""
|
|
meta = {}
|
|
meta['title'] = soup.title.text
|
|
meta['html_url'] = soup.find("link", rel='alternate').get('href')
|
|
meta['description'] = soup.subtitle.text
|
|
|
|
entries = soup.find_all('entry')
|
|
feed_entries = []
|
|
for entry in entries[:20]:
|
|
entry_dict = {}
|
|
entry_dict['title'] = entry.title.text
|
|
entry_dict['link'] = entry.find("link", rel='alternate').get('href')
|
|
date = entry.updated.text
|
|
date = "".join(re.search("(.*)\.\d{3}(.\d{2}):(\d{2})", date).groups())
|
|
date = time.strptime(date, '%Y-%m-%dT%H:%M:%S%z')
|
|
entry_dict['date'] = time.strftime('%Y-%m-%d', date)
|
|
entry_dict['description'] = entry.content.text[:200]
|
|
# TODO: html sanitation
|
|
feed_entries.append(entry_dict)
|
|
feed = {'meta': meta, 'entries': feed_entries}
|
|
return feed
|
|
|
|
|
|
def scrape_factorio(soup):
|
|
"""Handles the special case that is the Factorio development blog."""
|
|
meta = {}
|
|
meta['title'] = "Factorio"
|
|
meta['html_url'] = soup.find('link').get('href')
|
|
meta['description'] = soup.title.text
|
|
|
|
entries = soup.find_all('entry')
|
|
feed_entries = []
|
|
for entry in entries[:20]:
|
|
entry_dict = {}
|
|
entry_dict['title'] = entry.title.text
|
|
entry_dict['link'] = entry.find('link').get('href')
|
|
date = entry.updated.text
|
|
date = time.strptime(date[:-3]+date[-2:], '%Y-%m-%dT%H:%M:%S%z')
|
|
entry_dict['date'] = time.strftime('%Y-%m-%d', date)
|
|
entry_dict['description'] = entry.content.text[:200]
|
|
# TODO: html sanitation
|
|
feed_entries.append(entry_dict)
|
|
feed = {'meta': meta, 'entries': feed_entries}
|
|
return feed
|
|
|
|
|
|
def update_feed(feed_url):
|
|
"""Updates the given feed_id."""
|
|
try:
|
|
feed = scrape_feed(feed_url)
|
|
except Exception as e:
|
|
feed = {
|
|
'meta': {
|
|
'title': feed_url,
|
|
'html_url': feed_url,
|
|
'description': ""
|
|
},
|
|
'error': str(e),
|
|
}
|
|
_r.set(feed_url, json.dumps(feed))
|
|
|
|
|
|
def update_all_feeds():
|
|
"""Updates all feeds being watched."""
|
|
for feed_url in config.FEEDS:
|
|
update_feed(feed_url)
|
|
|
|
|
|
def get_feed(feed_url):
|
|
"""Returns all stored information about the feed."""
|
|
return json.loads(_r.get(feed_url))
|