SimpleSyndicate/database.py

137 lines
3.8 KiB
Python
Raw Normal View History

2018-10-21 01:03:14 -04:00
#!/usr/bin/env python3
"""
Tools for updating and extracting data from the Redis database.
"""
import re
import json
import time
import bs4
import redis
import requests
import config
2019-10-21 10:31:02 -04:00
#_r = redis.Redis(config.REDIS_HOST, config.REDIS_PORT)
_r = redis.Redis(unix_socket_path=config.REDIS_SOCKET)
2018-10-21 01:03:14 -04:00
def scrape_feed(feed_url):
"""Scrapes an RSS feed and extract all relevant data from it."""
2019-04-07 14:15:07 -04:00
headers = {"User-Agent": "Feed me RSS"}
res = requests.get(feed_url, headers=headers, timeout=10)
res.raise_for_status()
2018-10-21 01:03:14 -04:00
encoding = re.search(r'encoding="(.*)"', res.text)
if encoding:
encoding = encoding.group(1)
else:
encoding = 'utf-8'
soup = bs4.BeautifulSoup(res.content, 'xml', from_encoding=encoding)
if "factorio.com" in feed_url: # because screw making it generic
return scrape_factorio(soup)
2018-10-21 02:29:24 -04:00
elif "zombieknight" in feed_url:
return scrape_zombie_knight(soup)
2018-10-21 01:03:14 -04:00
meta = {}
meta['title'] = soup.title.text
meta['html_url'] = soup.find("link", href="").text
2019-07-16 21:55:05 -04:00
meta['html_url'] = meta['html_url'].replace('http://', 'https://')
2018-10-21 01:03:14 -04:00
meta['description'] = soup.description.text
entries = soup.find_all('item')
feed_entries = []
for entry in entries[:20]:
entry_dict = {}
entry_dict['title'] = entry.title.text
entry_dict['link'] = entry.link.text
2019-07-16 21:55:05 -04:00
entry_dict['link'] = entry_dict['link'].replace('http://', 'https://')
2018-10-21 01:03:14 -04:00
try:
date = entry.pubDate.text
2018-10-21 02:29:24 -04:00
try:
date = time.strptime(date, '%a, %d %b %Y %H:%M:%S %z')
except ValueError:
date = time.strptime(date, '%a, %d %b %Y %H:%M:%S %Z')
entry_dict['date'] = time.strftime('%Y-%m-%d', date)
2018-10-21 01:03:14 -04:00
except AttributeError:
2018-10-21 02:29:24 -04:00
entry_dict['date'] = ""
2018-10-21 01:03:14 -04:00
entry_dict['description'] = entry.description.text[:200]
# TODO: html sanitation
feed_entries.append(entry_dict)
feed = {'meta': meta, 'entries': feed_entries}
return feed
2018-10-21 02:29:24 -04:00
def scrape_zombie_knight(soup):
"""Handles the special case that is Zombie Knight Saga."""
meta = {}
meta['title'] = soup.title.text
meta['html_url'] = soup.find("link", rel='alternate').get('href')
meta['description'] = soup.subtitle.text
entries = soup.find_all('entry')
feed_entries = []
for entry in entries[:20]:
entry_dict = {}
entry_dict['title'] = entry.title.text
2018-10-22 09:47:45 -04:00
entry_dict['link'] = entry.find("link", rel='alternate').get('href')
2018-10-21 02:29:24 -04:00
date = entry.updated.text
date = "".join(re.search("(.*)\.\d{3}(.\d{2}):(\d{2})", date).groups())
date = time.strptime(date, '%Y-%m-%dT%H:%M:%S%z')
entry_dict['date'] = time.strftime('%Y-%m-%d', date)
entry_dict['description'] = entry.content.text[:200]
# TODO: html sanitation
feed_entries.append(entry_dict)
feed = {'meta': meta, 'entries': feed_entries}
return feed
2018-10-21 01:03:14 -04:00
def scrape_factorio(soup):
"""Handles the special case that is the Factorio development blog."""
meta = {}
meta['title'] = "Factorio"
meta['html_url'] = soup.find('link').get('href')
meta['description'] = soup.title.text
entries = soup.find_all('entry')
feed_entries = []
for entry in entries[:20]:
entry_dict = {}
entry_dict['title'] = entry.title.text
entry_dict['link'] = entry.find('link').get('href')
date = entry.updated.text
date = time.strptime(date[:-3]+date[-2:], '%Y-%m-%dT%H:%M:%S%z')
entry_dict['date'] = time.strftime('%Y-%m-%d', date)
entry_dict['description'] = entry.content.text[:200]
# TODO: html sanitation
feed_entries.append(entry_dict)
feed = {'meta': meta, 'entries': feed_entries}
return feed
def update_feed(feed_url):
"""Updates the given feed_id."""
2019-04-07 14:15:07 -04:00
try:
feed = scrape_feed(feed_url)
except Exception as e:
feed = {
'meta': {
'title': feed_url,
'html_url': feed_url,
'description': ""
},
'error': str(e),
}
2018-10-21 01:03:14 -04:00
_r.set(feed_url, json.dumps(feed))
def update_all_feeds():
"""Updates all feeds being watched."""
for feed_url in config.FEEDS:
update_feed(feed_url)
def get_feed(feed_url):
"""Returns all stored information about the feed."""
return json.loads(_r.get(feed_url))