374faca7be
Local times are mainly important in the email do-not-disturb range, and in notification history. Regular feed timestamps are displayed as relative, so even if the dates are now affected by the user's time zone, that won't make a big difference. Tinylog feed timestamps are always in UTC ("+0000"), as are Gemini feed dates. IssueID #4
251 lines
8.9 KiB
Python
251 lines
8.9 KiB
Python
import datetime
|
|
import json
|
|
import os
|
|
import re
|
|
import threading
|
|
import time
|
|
import subprocess
|
|
from model import Database, Notification, User
|
|
from utils import plural_s
|
|
|
|
pjoin = os.path.join
|
|
HOUR_RANGE = re.compile(r'(\d+)-(\d+)')
|
|
|
|
|
|
def is_hour_in_range(hour, hour_range):
|
|
try:
|
|
m = HOUR_RANGE.match(hour_range)
|
|
begin, end = int(m[1]), int(m[2])
|
|
if begin < end:
|
|
return hour >= begin and hour <= end
|
|
else:
|
|
# Range crosses midnight.
|
|
return hour >= begin or hour <= end
|
|
except:
|
|
return False
|
|
|
|
|
|
class Emailer (threading.Thread):
|
|
def __init__(self, capsule, hostname, port, cfg):
|
|
super().__init__()
|
|
|
|
self.capsule = capsule
|
|
self.cfg = cfg
|
|
self.hostname = hostname
|
|
self.server_link = f'gemini://{hostname}'
|
|
if port != 1965:
|
|
self.server_link += f':{port}'
|
|
|
|
# TODO: These are also in 50_bubble.py, should be DRY.
|
|
self.site_icon = cfg.get('icon', '💬')
|
|
self.site_name = cfg.get('name', 'Bubble')
|
|
self.site_info = cfg.get('info', "Bulletin Boards for Gemini")
|
|
|
|
self.email_interval = int(cfg.get("email.interval", 60 * 5))
|
|
self.email_cmd = cfg.get("email.cmd", "")
|
|
self.email_from = cfg.get('email.from', 'nobody@example.com')
|
|
self.email_footer = f'\n---\n{self.site_name}: {self.site_info}\n'
|
|
|
|
def send_notifications(self, db):
|
|
cur = db.conn.cursor()
|
|
|
|
# Find users with unsent notifications.
|
|
cur.execute("""
|
|
SELECT
|
|
u.id, name, email, email_range, notif
|
|
FROM users u
|
|
JOIN notifs n ON u.id=n.dst AND n.is_sent=FALSE
|
|
WHERE
|
|
email!='' AND
|
|
notif!=0 AND
|
|
TIMESTAMPDIFF(MINUTE, ts_email, CURRENT_TIMESTAMP())>email_inter
|
|
""")
|
|
pending_notifs = []
|
|
cur_hour = datetime.datetime.now(datetime.timezone.utc).hour
|
|
for (id, name, email, email_range, enabled_types) in cur:
|
|
# Check that the current hour is not excluded.
|
|
if is_hour_in_range(cur_hour, email_range):
|
|
continue
|
|
pending_notifs.append(User(id, name, None, None, None, None, None,
|
|
None, enabled_types, email, None,
|
|
email_range, None, None, None, None,
|
|
None, None, None))
|
|
|
|
messages = {}
|
|
footer = \
|
|
f'\nView notifications in your Dashboard:\n=> {self.server_link}/dashboard\n' + \
|
|
f'\nChange notification settings:\n=> {self.server_link}/settings/notif\n' + \
|
|
self.email_footer
|
|
|
|
for user in pending_notifs:
|
|
notifs = db.get_notifications(user, only_unsent=True)
|
|
|
|
count = 0
|
|
body = ''
|
|
|
|
def personal_first(n):
|
|
prio = Notification.PRIORITY[n.type] if n.type in Notification.PRIORITY else 10
|
|
return (-prio, n.ts)
|
|
|
|
for notif in sorted(notifs, key=personal_first):
|
|
count += 1
|
|
_, label = notif.entry(show_age=False)
|
|
body += label + '\n\n'
|
|
|
|
if count:
|
|
subject = f'{user.name}: {count} new notification{plural_s(count)}'
|
|
messages[user.email] = (subject, body + footer)
|
|
|
|
# Mark everything as sent.
|
|
if len(pending_notifs):
|
|
user_ids = list(map(lambda u: u.id, pending_notifs))
|
|
placeholders = ','.join(map(str, user_ids))
|
|
cur.execute(f"""
|
|
UPDATE notifs
|
|
SET is_sent=TRUE
|
|
WHERE dst IN ({placeholders})
|
|
""")
|
|
cur.execute(
|
|
f"UPDATE users SET ts_email=CURRENT_TIMESTAMP() WHERE id IN ({placeholders})")
|
|
db.commit()
|
|
cur = None
|
|
db.close()
|
|
|
|
for email in messages:
|
|
subject, body = messages[email]
|
|
try:
|
|
msg = f'From: {self.site_name} <{self.email_from}>\n' + \
|
|
f'To: {email}\n' + \
|
|
f'Subject: {subject}\n\n' + \
|
|
body
|
|
|
|
args = [self.email_cmd, '-i', email]
|
|
if self.email_cmd == 'stdout':
|
|
print(args, msg)
|
|
else:
|
|
subprocess.check_output(args, input=msg, encoding='utf-8')
|
|
except Exception as x:
|
|
print('Emailer error:', x)
|
|
|
|
def run(self):
|
|
if not self.email_cmd:
|
|
# Emailter is disabled.
|
|
return
|
|
print(" Emailer is running")
|
|
while not self.capsule.shutdown_event().wait(self.email_interval):
|
|
db = Database(self.cfg)
|
|
try:
|
|
self.send_notifications(db)
|
|
except:
|
|
import traceback
|
|
traceback.print_last()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
class RepoFetcher (threading.Thread):
|
|
INTERVAL = 60 * 20
|
|
|
|
class Git:
|
|
"""Helper for running git in a specific directory."""
|
|
|
|
def __init__(self, cmd, path):
|
|
self.cmd = cmd
|
|
self.path = path
|
|
|
|
def run(self, args, as_bytes=False, without_path=False):
|
|
result = subprocess.check_output(
|
|
([self.cmd] if without_path else
|
|
[self.cmd, '-C', self.path]) + args
|
|
)
|
|
if as_bytes: return result
|
|
return result.decode('utf-8').rstrip()
|
|
|
|
def log(self, count=None, skip=0):
|
|
try:
|
|
count_arg = [f'-n{count}'] if count else []
|
|
out = self.run([
|
|
'log',
|
|
'--all'] +
|
|
count_arg +
|
|
[f'--skip={skip}',
|
|
"--pretty=format:{^@^fullHash^@^:^@^%H^@^,^@^message^@^:^@^%s^@^,^@^body^@^:^@^%b^@^,^@^commitDate^@^:^@^%ai^@^},^@&@^"
|
|
])
|
|
out = out.replace('^@&@^\n', '').replace(',^@&@^', '') \
|
|
.replace('\\', '\\\\').replace('\n', '\\n') \
|
|
.replace('\t', ' ').replace('"', '\\"').replace('^@^', '"') \
|
|
.replace('\\n#', '\\n') \
|
|
.replace('"body":"#', '"body":"')
|
|
out = '[' + out + ']'
|
|
#print(out)
|
|
return json.loads(out)
|
|
except Exception as x:
|
|
print('Error:', x)
|
|
return []
|
|
|
|
def __init__(self, capsule, cfg):
|
|
super().__init__()
|
|
|
|
self.capsule = capsule
|
|
self.cfg = cfg
|
|
|
|
self.cache_dir = cfg.get("repo.cachedir", "")
|
|
self.git_cmd = cfg.get("repo.git", "/usr/bin/git")
|
|
|
|
def fetch_pending(self, db, repo):
|
|
if repo.ts_fetch != None and \
|
|
time.time() - repo.ts_fetch < RepoFetcher.INTERVAL:
|
|
return
|
|
if not repo.clone_url:
|
|
return
|
|
|
|
# It's time to fetch now.
|
|
cache_path = pjoin(self.cache_dir, str(repo.id))
|
|
os.makedirs(cache_path, exist_ok=True)
|
|
git = RepoFetcher.Git(self.git_cmd, cache_path)
|
|
|
|
if not os.path.exists(pjoin(cache_path, 'config')):
|
|
git.run(['clone', '--bare', repo.clone_url, cache_path], without_path=True)
|
|
git.run(['config', 'remote.origin.fetch', 'refs/heads/*:refs/heads/*']) # enable fetch
|
|
num_commits = None
|
|
else:
|
|
git.run(['fetch', '--prune'])
|
|
num_commits = 100 # Since the last `INTERVAL` mins, so probably enough.
|
|
|
|
# Update the fetch timestamp.
|
|
cur = db.conn.cursor()
|
|
cur.execute("UPDATE repos SET ts_fetch=CURRENT_TIMESTAMP() WHERE id=?", (repo.id,))
|
|
db.commit()
|
|
|
|
issue_pattern = re.compile(r'\b' + repo.idlabel + r'\s*#(\d+)\b')
|
|
|
|
# Read the history to find out about commits.
|
|
for commit in git.log(num_commits):
|
|
hash = commit['fullHash']
|
|
date = commit['commitDate']
|
|
message = commit['message']
|
|
body = commit['body']
|
|
issuerefs = map(int, issue_pattern.findall(message + '\n' + body))
|
|
|
|
cur.execute("INSERT IGNORE INTO commits (repo, hash, msg, ts) VALUES (?, ?, ?, ?)",
|
|
(repo.id, hash, message, date))
|
|
for issue in issuerefs:
|
|
cur.execute("INSERT IGNORE INTO issuerefs (repo, commit, issue) VALUES (?, ?, ?)",
|
|
(repo.id, hash, issue))
|
|
db.commit()
|
|
|
|
def run(self):
|
|
if not self.cache_dir:
|
|
# Fetcher disabled.
|
|
return
|
|
print(" RepoFetcher is running")
|
|
while not self.capsule.shutdown_event().wait(5.0):
|
|
db = Database(self.cfg)
|
|
try:
|
|
for repo in db.get_repositories():
|
|
try:
|
|
self.fetch_pending(db, repo)
|
|
except subprocess.CalledProcessError:
|
|
print('Error when fetching repository:', repo.clone_url)
|
|
finally:
|
|
db.close()
|