You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
173 lines
6.0 KiB
173 lines
6.0 KiB
#!/usr/bin/env python |
|
""" |
|
rssnotify.py - Phenny RSSNotify Module |
|
Copyright 2016, sfan5 |
|
Licensed under GNU General Public License v2.0 |
|
""" |
|
import time |
|
import re |
|
import web |
|
import os |
|
import threading |
|
|
|
import feedparser # sudo pip install feedparser |
|
|
|
def to_unix_time(tstr): |
|
if tstr.endswith("Z"): |
|
tstr = tstr[:-1] + "+00:00" |
|
r = re.compile(r"(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2})([-+])(\d{2}):(\d{2})") |
|
g = r.match(tstr).groups(1) |
|
# This function would be like 100% shorter if the time library didn't suck so hard. |
|
# time.strptime completly ignores timezones because who needs timezones anyway? |
|
# time.timezone is only for non-DST; nobody reading 'time.timezone' in code expects that |
|
# you have time.gmtime() but you can't use it because time.mktime() fucks with it when it's DST |
|
# </rant> |
|
ts = time.mktime(time.strptime(g[0], "%Y-%m-%dT%H:%M:%S")) |
|
ts -= (time.altzone if time.daylight else time.timezone) |
|
if g[1] == "+": |
|
ts -= int(g[2]) * 60 * 60 |
|
ts -= int(g[3]) * 60 |
|
else: # g[1] == "-" |
|
ts += int(g[2]) * 60 * 60 |
|
ts += int(g[3]) * 60 |
|
return ts |
|
|
|
def resolve_channels(phenny, l): |
|
ret = set() |
|
for entry in l: |
|
sign = 1 |
|
if entry[0] == "-": |
|
entry = entry[1:] |
|
sign = -1 |
|
c = phenny.bot.channels if entry == "*" else [entry] |
|
if sign == 1: |
|
ret |= set(c) |
|
else: # sign == -1 |
|
ret -= set(c) |
|
return ret |
|
|
|
class RssNotify(): |
|
MAX_MESSAGES = 6 |
|
def __init__(self, config): |
|
self.config = config |
|
self.last_updated = {} |
|
self.last_check = 0 |
|
self.firstrun = True |
|
for i in range(len(self.config["feeds"])): |
|
self.last_updated[i] = 0 |
|
def needs_check(self): |
|
return time.time() > self.last_check + self.config["check_interval"] |
|
def check(self, phenny): |
|
start = self.last_check = time.time() |
|
print("[RssNotify]: Checking RSS feeds...") |
|
for fid, feedspec in enumerate(self.config["feeds"]): |
|
feed = feedparser.parse(feedspec[0], agent="Mozilla/5.0 (compatible; MinetestBot)") |
|
if self.firstrun: |
|
self.last_updated[fid] = max((to_unix_time(e.updated) for e in feed.entries), default=0) |
|
continue |
|
new = [] |
|
for entry in feed.entries: |
|
if self.last_updated[fid] >= to_unix_time(entry.updated): |
|
continue |
|
new.append(entry) |
|
if len(new) == 0: |
|
continue |
|
print("[RssNotify]: Found %d update(s) for '%s'" % (len(new), feedspec[0])) |
|
self.last_updated[fid] = max(to_unix_time(e.updated) for e in feed.entries) |
|
new.reverse() |
|
if self.config["logfile"] is not None: |
|
with open(self.config["logfile"], "a", encoding="utf-8") as f: |
|
for entry in new: |
|
message = self._format_msg(entry, log_format=True) |
|
f.write(message + "\n") |
|
for entry in new[:RssNotify.MAX_MESSAGES]: |
|
message = self._format_msg(entry) |
|
self._announce(phenny, message, feedspec[1]) |
|
if len(new) > RssNotify.MAX_MESSAGES: |
|
message = self._get_cutoff_message(len(new) - RssNotify.MAX_MESSAGES) |
|
self._announce(phenny, message, feedspec[1]) |
|
self.firstrun = False |
|
print("[RssNotify]: Checked %d RSS feeds in %0.3f seconds" % (len(self.config["feeds"]), time.time()-start)) |
|
def _shorten(self, link): |
|
# We can utilitze git.io to shorten *.github.com links |
|
l, code = web.post("https://git.io/create", {'url': link}) |
|
if code != 200: |
|
return None |
|
l = str(l, 'utf-8') |
|
if ' ' in l: # spaces means there was an error :( |
|
return None |
|
return "https://git.io/" + l |
|
def _format_msg(self, feed_entry, log_format=False): |
|
if log_format: |
|
f_cshort = "[color=#c00]%s[/color]" |
|
f_clong = "[color=#c00]%s[/color] ([color=#c00]%s[/color])" |
|
f_all = "[color=#3465a4][git][/color] %s -> [color=#73d216]%s[/color]: [b]%s[/b] [color=#a04265]%s[/color] %s ([color=#888a85]%s[/color])" |
|
else: |
|
f_cshort = "\x0304%s\x0f" |
|
f_clong = "\x0304%s\x0f (\x0304%s\x0f)" |
|
f_all = "\x0302[git]\x0f %s -> \x0303%s\x0f: \x02%s\x0f \x0313%s\x0f %s (\x0315%s\x0f)" |
|
committer_realname = feed_entry.authors[0].name |
|
if committer_realname == "": |
|
try: |
|
committer_realname = feed_entry.authors[0].email |
|
except AttributeError: |
|
committer_realname = "" |
|
try: |
|
committer = feed_entry.authors[0].href.replace('https://github.com/',"") |
|
except AttributeError: |
|
committer = committer_realname # This will only use the realname if the nickname couldn't be obtained |
|
m = re.search(r'/([a-zA-Z0-9_-]+/[a-zA-Z0-9_-]+)/commit/([a-f0-9]{7})', feed_entry.links[0].href) |
|
repo_name = m.group(1) if m else "?" |
|
commit_hash = m.group(2) if m else "???????" |
|
commit_time = feed_entry.updated |
|
commit_text = feed_entry.title |
|
if self.config["show_link"]: |
|
commit_link = feed_entry.link |
|
if self.config["shorten_link"]: |
|
commit_link = self._shorten(commit_link) or commit_link |
|
else: |
|
commit_link = "" |
|
if committer_realname == "" or committer_realname.lower() == committer.lower(): |
|
committer_final = f_cshort % committer |
|
else: |
|
committer_final = f_clong % (committer, committer_realname) |
|
return f_all % (committer_final, repo_name, commit_text, commit_hash, commit_link, commit_time) |
|
def _get_cutoff_message(self, left): |
|
return "\x0302[git]\x0f (%d newer commits not shown)" % left |
|
def _announce(self, phenny, message, chans): |
|
chans = resolve_channels(phenny, chans) |
|
for ch in chans: |
|
phenny.write(['PRIVMSG', ch], message) |
|
|
|
################# |
|
|
|
c = ['*', '-#minetest-hub'] |
|
rssn = RssNotify({ |
|
"check_interval": 120, |
|
"show_link": True, |
|
"shorten_link": True, |
|
"logfile": os.getcwd() + "/rssnotify.log", |
|
"feeds": [ |
|
('https://github.com/minetest/minetest/commits/master.atom', c), |
|
('https://github.com/minetest/minetest_game/commits/master.atom', c), |
|
('https://github.com/minetest/minetestmapper/commits/master.atom', c), |
|
('https://github.com/minetest/serverlist/commits/master.atom', c), |
|
('https://github.com/sfan5/phenny/commits/master.atom', ['##minetestbot']), |
|
('https://github.com/sfan5/minetestbot-modules/commits/master.atom', ['##minetestbot']), |
|
], |
|
}) |
|
|
|
def rsscheck(phenny, input): |
|
if not rssn.needs_check(): |
|
return |
|
t = threading.Thread(target=rssn.check, args=(phenny, )) |
|
t.start() |
|
|
|
rsscheck.priority = 'low' |
|
rsscheck.rule = r'.*' |
|
rsscheck.event = '*' |
|
rsscheck.thread = False |
|
rsscheck.nohook = True |
|
|
|
if __name__ == '__main__': |
|
print(__doc__.strip())
|
|
|