842 lines
34 KiB
Python
842 lines
34 KiB
Python
import datetime
|
|
import io
|
|
import time
|
|
import urllib.parse as urlparse
|
|
import zipfile
|
|
from model import Search, Segment, User, Subspace, Post
|
|
from utils import *
|
|
|
|
|
|
def make_subspace(session):
|
|
if not session.user:
|
|
return 60, "Login required"
|
|
|
|
if not session.bubble.user_subspaces and session.user.role == User.BASIC:
|
|
return 61, "Not authorized"
|
|
|
|
db = session.db
|
|
req = session.req
|
|
if is_empty_query(req):
|
|
return 10, "Enter name for new subspace: " + session.NAME_HINT
|
|
|
|
name = clean_query(req)
|
|
if not is_valid_name(name):
|
|
return 10, "That is an invalid subspace name. " + session.NAME_HINT
|
|
|
|
db.create_subspace(name, session.user.id)
|
|
return 30, f'/s/{name}/admin'
|
|
|
|
|
|
def make_subspaces_page(session):
|
|
page = '# Subspaces\n'
|
|
if session.user:
|
|
page += session.dashboard_link()
|
|
page += '=> / Back to front page\n'
|
|
|
|
if session.user:
|
|
page += f'\n## Moderated by {session.user.name}\n'
|
|
subs = session.db.get_subspaces(mod=session.user.id)
|
|
if len(subs):
|
|
for sub in subs:
|
|
page += f'=> /{sub.title()} {sub.title()}\n'
|
|
if session.bubble.user_subspaces or session.user.role == User.ADMIN:
|
|
page += f'\n=> /new-subspace 🌒 New subspace\n'
|
|
|
|
page += '\n## Index\n'
|
|
page += 'Inactive and unused subspaces will be purged.\n'
|
|
subs = session.db.get_subspaces(owner=0)
|
|
if len(subs):
|
|
for sub in subs:
|
|
page += f'=> /{sub.title()} {sub.title()}\n'
|
|
else:
|
|
page += 'No subspaces.\n'
|
|
|
|
return page
|
|
|
|
|
|
def subspace_admin_actions(session, action):
|
|
if not session.is_user_mod:
|
|
return 61, "Moderator rights required"
|
|
|
|
user = session.user
|
|
req = session.req
|
|
db = session.db
|
|
subspace = session.context
|
|
admin_link = f'/{subspace.title()}/admin'
|
|
|
|
page = f'# {subspace.title()}: Administration\n'
|
|
|
|
if user.role == User.ADMIN:
|
|
m = re.search(r'/repo/(new|delete|clone-url|view-url|idlabel)?(/([0-9a-zA-Z]{10}))?$', req.path)
|
|
if m:
|
|
if m[1] == 'new':
|
|
db.create_repository(subspace)
|
|
return 30, f'{admin_link}/repo/'
|
|
|
|
elif m[1] == 'delete' and m[3]:
|
|
if not db.verify_token(user, m[3]):
|
|
return 61, 'Not authorized'
|
|
if is_empty_query(req):
|
|
return 10, 'Really delete repository and commit history? (DELETE to confirm)'
|
|
if req.query == 'DELETE':
|
|
db.destroy_repository(db.get_repository(subspace=subspace))
|
|
return 30, f'{admin_link}/repo/'
|
|
|
|
elif m[1] == 'clone-url':
|
|
if req.query is None:
|
|
return 10, 'HTTPS URL where to clone repository:'
|
|
db.update_repository(db.get_repository(subspace=subspace),
|
|
clone_url=clean_query(req))
|
|
return 30, f'{admin_link}/repo/'
|
|
|
|
elif m[1] == 'view-url':
|
|
if req.query is None:
|
|
return 10, 'Gemini URL for viewing commits:'
|
|
db.update_repository(db.get_repository(subspace=subspace),
|
|
view_url=clean_query(req))
|
|
return 30, f'{admin_link}/repo/'
|
|
|
|
elif m[1] == 'idlabel':
|
|
if req.query is None:
|
|
return 10, 'Label for marking issue IDs in commit messages: (For example, "IssueID")'
|
|
db.update_repository(db.get_repository(subspace=subspace),
|
|
idlabel=clean_query(req))
|
|
return 30, f'{admin_link}/repo/'
|
|
|
|
page += f'=> {admin_link} Go back\n'
|
|
page += '\n## Git Repository\n'
|
|
|
|
repo = db.get_repository(subspace=subspace)
|
|
if not repo:
|
|
page += '=> {admin_link}/repo/new 🛢️ New repository\n'
|
|
return page
|
|
|
|
page += f'\n=> {admin_link}/repo/clone-url Clone HTTPS URL: {repo.clone_url if repo.clone_url else "(not set)"}\n'
|
|
page += f'=> {admin_link}/repo/idlabel Issue ID label: {repo.idlabel if repo.idlabel else ""}\n'
|
|
page += f'\n=> {admin_link}/repo/view-url Commit view Gemini URL: {repo.view_url if repo.view_url else "(not set)"}\n'
|
|
|
|
# Status information.
|
|
page += '\n### Status\n'
|
|
if repo.ts_fetch is None:
|
|
if not repo.clone_url:
|
|
page += 'Not configured.\n'
|
|
else:
|
|
page += 'Repository will be fetched soon.\n'
|
|
else:
|
|
n = db.count_commits(repo)
|
|
page += f'{n} commits in history. Repository was last fetched on {datetime.datetime.fromtimestamp(repo.ts_fetch, UTC).strftime("%Y-%m-%d at %H:%M:%S %Z")}.\n'
|
|
|
|
page += f'\n=> {admin_link}/repo/delete/{db.get_token(user)} ❌ Delete repository\n'
|
|
|
|
return page
|
|
|
|
if action == 'info':
|
|
if req.query == None:
|
|
return 10, f"Description for {session.context.title()}:"
|
|
db.update_subspace(session.context, info=clean_title(clean_query(req)))
|
|
return 30, admin_link
|
|
|
|
if action == 'url':
|
|
if req.query == None:
|
|
return 10, f"Featured link for {session.context.title()}: (URL and label)"
|
|
try:
|
|
link = form_link(parse_link_segment_query(req))
|
|
except:
|
|
link = ''
|
|
db.update_subspace(session.context, url=link)
|
|
return 30, admin_link
|
|
|
|
if action == 'omit-all':
|
|
if session.context.flags & Subspace.HIDE_OMIT_SETTING_FLAG:
|
|
return 61, 'Not authorized'
|
|
db.update_subspace(session.context, flags=session.context.flags ^ Subspace.OMIT_FROM_ALL_FLAG)
|
|
return 30, admin_link
|
|
|
|
# Actions that require link verification.
|
|
m = re.search(r'/(delete|tracker|rename|add-mod|remove-mod)/([0-9a-zA-Z]{10})$', req.path)
|
|
if m:
|
|
token = m[2]
|
|
if not db.verify_token(session.user, token):
|
|
return 61, 'Not authorized'
|
|
|
|
if m[1] == 'add-mod':
|
|
if is_empty_query(req):
|
|
return 10, 'Enter user to add as moderator:'
|
|
adding = db.get_user(name=clean_query(req))
|
|
if not adding:
|
|
return 51, 'Not found'
|
|
db.modify_mods(session.context, actor=session.user, add=adding)
|
|
return 30, admin_link
|
|
|
|
if m[1] == 'remove-mod':
|
|
if is_empty_query(req):
|
|
return 10, 'Enter user to remove as moderator:'
|
|
removing = db.get_user(name=clean_query(req))
|
|
if not removing:
|
|
return 51, 'Not found'
|
|
db.modify_mods(session.context, actor=session.user, remove=removing)
|
|
return 30, admin_link
|
|
|
|
if m[1] == 'rename':
|
|
prompt = f'Enter new name for {session.context.name}? (Warning: Links to subspace will break!)'
|
|
if is_empty_query(req):
|
|
return 10, prompt
|
|
new_name = clean_query(req)
|
|
if not is_valid_name(new_name):
|
|
return 10, prompt
|
|
try:
|
|
db.update_subspace(session.context, name=new_name)
|
|
except:
|
|
return 10, prompt
|
|
return 30, f'/s/{new_name}/admin'
|
|
|
|
if m[1] == 'delete' and session.user.role == User.ADMIN:
|
|
if is_empty_query(req):
|
|
return 10, f'Really delete {session.context.title()}? (Enter DELETE to confirm.)'
|
|
if req.query == 'DELETE':
|
|
return 30, f'/admin/delete-subspace/{token}?{session.context.name}'
|
|
return 30, admin_link
|
|
|
|
elif m[1] == 'tracker' and (session.user.role == User.ADMIN or
|
|
session.db.is_empty_subspace(session.context)):
|
|
new_flags = session.context.flags ^ Subspace.ISSUE_TRACKER
|
|
if new_flags & Subspace.ISSUE_TRACKER:
|
|
# Issues shouldn't be listed in All Posts.
|
|
new_flags = new_flags | Subspace.OMIT_FROM_ALL_FLAG
|
|
db.update_subspace(session.context, flags=new_flags)
|
|
return 30, admin_link
|
|
|
|
page += session.context.subspace_link()
|
|
|
|
if not session.context.flags & (Subspace.ISSUE_TRACKER | Subspace.HIDE_OMIT_SETTING_FLAG):
|
|
page += f'\n=> {admin_link}/omit-all {session.CHECKS[session.context.flags & Subspace.OMIT_FROM_ALL_FLAG]} Omit from All Posts\n'
|
|
|
|
page += '\n## About\n'
|
|
|
|
page += '\n### Description\n'
|
|
page += (session.context.info if session.context.info else '(no description)') + '\n'
|
|
page += f'=> {admin_link}/info ✏️ Edit\n'
|
|
|
|
page += '\n### Featured Link\n'
|
|
page += (f'=> {session.context.url}' if session.context.url else '(no featured link)') + '\n'
|
|
page += f'=> {admin_link}/url ✏️ Edit\n'
|
|
|
|
page += '\n## Moderators\n\n'
|
|
mods = db.get_mods(session.context)
|
|
for mod in mods:
|
|
page += f'=> /u/{mod.name} {mod.avatar} {mod.name}\n'
|
|
page += f'=> {admin_link}/add-mod/{session.get_token()} Add moderator\n'
|
|
if len(mods) > 1:
|
|
page += f'=> {admin_link}/remove-mod/{session.get_token()} Remove moderator\n'
|
|
|
|
if session.user.role == User.ADMIN or session.db.is_empty_subspace(session.context):
|
|
page += '\n## Issue Tracking\n'
|
|
page += f'\n=> {admin_link}/tracker/{db.get_token(session.user)} {session.CHECKS[nonzero(session.context.flags & Subspace.ISSUE_TRACKER)]} Subspace is an issue tracker\n'
|
|
page += 'Posts in an issue tracker are designated issue IDs and have an Open/Closed status. Issues may refer to Git repository commits via hash, and commit messages can refer to issues by ID. Non-issue posts are not allowed in an issue tracker subspace.\n'
|
|
|
|
if session.user.role == User.ADMIN:
|
|
page += f'\n=> {admin_link}/repo/ ⚙️ Git repository settings\n'
|
|
|
|
page += '\n## Actions\n'
|
|
page += '\n=> /export/' + session.context.title() + '.gpub 📤 Export data archive\n'
|
|
page += f'Download a ZIP archive containing all posts and comments in {session.context.title()}. The archive has Gempub metadata so it can also be viewed in a Gempub reader.\n'
|
|
|
|
page += f'\n=> {admin_link}/rename/{session.get_token()} Rename subspace\n'
|
|
page += 'Links pointing to the subspace will break when the name is changed.\n'
|
|
|
|
if session.user.role == User.ADMIN:
|
|
page += f'\n=> {admin_link}/delete/{session.get_token()} ⚠️ Delete subspace {session.context.title()}\n'
|
|
page += 'All posts and comments in the subspace will be deleted. Exporting a backup beforehand is recommended.\n'
|
|
|
|
return page
|
|
|
|
|
|
def split_terms(text):
|
|
import shlex
|
|
return list(filter(lambda t: len(t) >= 2, map(str.strip,
|
|
shlex.split(text.replace("'", "\\'")))))
|
|
|
|
|
|
def make_search_page(session):
|
|
req = session.req
|
|
db = session.db
|
|
user = session.user
|
|
|
|
LIMIT = 30
|
|
|
|
m = re.match(r'(/([us])/([\w%-]+))?/search(/(\d+))?', req.path)
|
|
if not m:
|
|
return 59, 'Bad request'
|
|
if m[2] or m[3]:
|
|
ident = urlparse.unquote(m[3])
|
|
scope = db.get_subspace(name=ident)
|
|
if m[2] == 'u' and not scope.owner:
|
|
return 51, 'Not found'
|
|
if not scope:
|
|
return 51, 'Not found'
|
|
else:
|
|
scope = None
|
|
page_index = max(0, int(m[5])) if m[5] else 0
|
|
|
|
if req.query is None:
|
|
return 10, f'Search {"in " + scope.title() if scope else session.bubble.site_name}:'
|
|
|
|
search_url = ('/' if not scope else f'/{scope.title()}/') + 'search'
|
|
terms = split_terms(clean_query(req))
|
|
|
|
if scope:
|
|
page = f'# Search in {scope.title()}\n'
|
|
else:
|
|
page = '# Search\n'
|
|
page += f'=> {search_url} 🔍 New search\n'
|
|
if scope:
|
|
page += f'=> /{scope.title()} Back to {scope.title()}\n'
|
|
else:
|
|
page += '=> / 🌒 Back to front page\n'
|
|
|
|
if terms:
|
|
page += '\n## ' + ' '.join(terms) + '\n'
|
|
|
|
# Perform the search.
|
|
search = Search(db)
|
|
count = search.run(terms, scope, limit=LIMIT, page_index=page_index)
|
|
|
|
# TODO: Just counting the matches without returning anything might be
|
|
# a useful addition in `model.Search`.
|
|
#page += f'Found {count} match{plural_s(count, "es")}.\n'
|
|
|
|
if page_index > 0:
|
|
page += f'\n=> {search_url}/{page_index - 1}?{req.query} Previous page\n'
|
|
|
|
if count == 0:
|
|
page += 'Found nothing matching the search terms.\n' if page_index == 0 else \
|
|
'No more results.\n'
|
|
|
|
for result in search.results:
|
|
page += '\n'
|
|
#ts = result[0]
|
|
obj = result[1]
|
|
|
|
if isinstance(obj, User):
|
|
page += f'=> /u/{obj.name} {obj.avatar} u/{obj.name}\n'
|
|
if obj.info:
|
|
page += f'{obj.info[:300].strip()}\n'
|
|
|
|
elif isinstance(obj, Subspace):
|
|
page += f'=> /s/{obj.name} s/{obj.name}\n'
|
|
if obj.info:
|
|
page += f'{obj.info[:300].strip()}\n'
|
|
|
|
elif isinstance(obj, Post):
|
|
ctx = ("u/" if obj.sub_owner else "s/") + obj.sub_name
|
|
kind = "Comment" if obj.parent else f"Issue #{obj.issueid}" if obj.issueid else "Post"
|
|
title = f' "{shorten_text(obj.title, 30)}"' if obj.title else ''
|
|
scope_desc = f"in {ctx} " if not scope and not obj.sub_owner else ""
|
|
page += f'=> /{ctx}/{obj.issueid if obj.issueid else obj.id} {kind}{title} {scope_desc}by {obj.poster_avatar} {obj.poster_name} on {obj.ymd_date(tz=session.tz)} {" · " if obj.tags else ""}{obj.tags}\n'
|
|
SEGTYPES = ['content', 'URL', 'image', 'attachment', 'poll option']
|
|
if result[2] != Segment.TEXT:
|
|
page += f'(matching {SEGTYPES[result[2]]}) '
|
|
page += obj.summary.replace('\n', ' ').replace('=>', ' ').strip() + '\n'
|
|
|
|
if count >= LIMIT:
|
|
page += f'\n=> {search_url}/{page_index + 1}?{req.query} Next page\nPage {page_index + 1}\n'
|
|
|
|
return page
|
|
|
|
|
|
def listed_items(items):
|
|
if len(items) == 0:
|
|
return ''
|
|
if len(items) == 1:
|
|
return items[0]
|
|
return ', '.join(items[0:-1]) + ' and ' + items[-1]
|
|
|
|
|
|
def make_timestamp(ts, fmt="%Y-%m-%d at %H:%M"):
|
|
return datetime.datetime.fromtimestamp(ts, UTC).strftime(fmt)
|
|
|
|
|
|
class GempubArchive:
|
|
|
|
class Entry:
|
|
def __init__(self, post, label, page, file=None):
|
|
self.ts = post.ts_created
|
|
self.dt = datetime.datetime.fromtimestamp(self.ts, UTC)
|
|
self.post_id = post.id
|
|
self.issueid = post.issueid
|
|
self.title = post.title
|
|
self.subspace_id = post.subspace
|
|
self.user_id = post.user
|
|
self.label = label
|
|
self.page = page
|
|
self.file = file
|
|
self.tags = post.tags
|
|
self.num_cmts = post.num_cmts
|
|
self.num_likes = post.num_likes
|
|
self.referenced_from_posts = []
|
|
|
|
def ymd(self):
|
|
return self.dt.strftime('%Y-%m-%d')
|
|
|
|
def path(self):
|
|
if self.file:
|
|
pos = self.file.segment_url.rfind('/') + 1
|
|
return f'file{self.file.id}_{self.file.segment_url[pos:]}'
|
|
fn = re.sub(r'[^\w\d-]', '', self.title.replace(' ', '-')).lower().strip() # clean it up
|
|
if len(fn) > 0:
|
|
fn = '_' + fn
|
|
#if len(fn) == 0:
|
|
# fn = f'{self.dt.day}_post{self.post_id}.gmi'
|
|
return f'{self.dt.year:04d}-{self.dt.month:02d}/{self.post_id}{fn}.gmi'
|
|
|
|
def __init__(self, session, user=None, subspace=None, month_range=None):
|
|
self.session = session
|
|
self.db = session.db
|
|
self.ts_range = None
|
|
if month_range:
|
|
year, month = month_range
|
|
end_month = month + 1 if month < 12 else 1
|
|
end_year = year if month < 12 else year + 1
|
|
self.ts_range = (
|
|
datetime.datetime(year, month, 1, 0, 0, 0, tzinfo=UTC).timestamp(),
|
|
datetime.datetime(end_year, end_month, 1, 0, 0, 0, tzinfo=UTC).timestamp()
|
|
)
|
|
self.user = user
|
|
self.subspace = subspace
|
|
self.is_user = self.ts_range is None and subspace.owner != 0
|
|
|
|
assert self.is_user and self.user or not self.is_user and not self.user
|
|
assert self.ts_range or self.subspace is not None
|
|
|
|
# Modify settion so rendered pages appear to be not logged in.
|
|
session.is_archive = True
|
|
session.user = None
|
|
|
|
self.site_link = session.server_root()
|
|
if month_range:
|
|
archive_title = f'{datetime.datetime(year, month, 1).strftime("%B %Y")}'
|
|
archive_description = f'All posts and comments made on {session.bubble.site_name}. '
|
|
else:
|
|
archive_title = f'{"s/" if not self.is_user else ""}{subspace.name} on {session.bubble.site_name}'
|
|
archive_description = \
|
|
(f'All posts and comments made in the subspace {subspace.title()} on {session.bubble.site_name}. ' if not self.is_user else f'All posts and comments made by {user.name} on {session.bubble.site_name}. ')
|
|
self.metadata = {
|
|
'gpubVersion': '1.0.0',
|
|
'title': archive_title,
|
|
'description': archive_description,
|
|
'author': f'Bubble v{session.bubble.version}',
|
|
'publishDate': time.strftime('%Y-%m-%d'),
|
|
'index': 'index.gmi'
|
|
}
|
|
|
|
self.local_entries = [] # posts in the archive's subspace
|
|
self.foreign_entries = [] # posts in other subspaces
|
|
self.subspace_entries = {} # subspace name => list of entries
|
|
self.comment_entries = [] # posts where user has commented
|
|
self.file_entries = [] # files
|
|
self.entry_index = {} # indexed by post ID
|
|
self.file_index = {} # indexed by file ID
|
|
self.referenced_users = {} # info about posters
|
|
self.total_count = [0, 0]
|
|
self.subspace_count = {} # [posts, comments]
|
|
|
|
self.subspaces = {}
|
|
self.users = {}
|
|
|
|
if self.is_user:
|
|
self.users[self.user.id] = user
|
|
self.add_user_page(self.user)
|
|
|
|
def user_page(self, user):
|
|
src = f'# {user.avatar} {user.name}\n'
|
|
if user.info:
|
|
src += user.info + '\n'
|
|
if user.url:
|
|
src += f'=> {user.url}\n'
|
|
src += f'\n\n=> {self.site_link}/u/{user.name} {user.name} on {self.session.bubble.site_name}\n'
|
|
src += 'The account was created on ' + \
|
|
make_timestamp(user.ts_created, '%Y-%m-%d') + '.\n'
|
|
return src
|
|
|
|
def get_subspace(self, id):
|
|
if id not in self.subspaces:
|
|
self.subspaces[id] = self.db.get_subspace(id=id)
|
|
return self.subspaces[id]
|
|
|
|
def get_user(self, id):
|
|
if id not in self.users:
|
|
self.users[id] = self.db.get_user(id=id)
|
|
return self.users[id]
|
|
|
|
def add_user_page(self, user):
|
|
if not user.name in self.referenced_users:
|
|
self.referenced_users[user.name] = (user, self.user_page(user))
|
|
|
|
def add_post_entry(self, post, is_comment=False):
|
|
from feeds import make_post_page
|
|
|
|
self.add_user_page(self.get_user(post.user))
|
|
|
|
# Modify session according to the post's subspace.
|
|
self.session.context = self.get_subspace(post.subspace)
|
|
self.session.is_context_tracker = (self.session.context.flags & Subspace.ISSUE_TRACKER) != 0
|
|
|
|
is_local = (post.subspace == self.subspace.id) if self.subspace else False
|
|
if not self.ts_range:
|
|
where = self.session.context.title() if not is_local and (
|
|
not self.is_user or is_comment) else None
|
|
label_sub = ' · ' + where if where else ''
|
|
|
|
page = make_post_page(self.session, post)
|
|
if self.ts_range:
|
|
label = shorten_text(clean_title(strip_links(post.summary)), 150)
|
|
else:
|
|
label = (post.title if post.title else shorten_text(clean_title(strip_links(post.summary)), 100)) + label_sub
|
|
entry = GempubArchive.Entry(post, label, page)
|
|
|
|
# Check for referenced users.
|
|
for username in re.findall(r'=> /u/([\w-]+)\s', page):
|
|
ref = self.db.get_user(name=username)
|
|
if ref:
|
|
self.add_user_page(ref)
|
|
|
|
if is_comment:
|
|
self.comment_entries.append(entry)
|
|
elif is_local:
|
|
self.local_entries.append(entry)
|
|
else:
|
|
self.foreign_entries.append(entry)
|
|
|
|
skey = self.session.context.name
|
|
if skey in self.subspace_entries:
|
|
self.subspace_entries[skey].append(entry)
|
|
else:
|
|
self.subspace_entries[skey] = [entry]
|
|
|
|
if not post.id in self.entry_index:
|
|
if not is_comment:
|
|
self.add_count(post.subspace,
|
|
(1, self.db.count_posts(parent_id=post.id, draft=False)))
|
|
|
|
self.entry_index[post.id] = entry
|
|
|
|
def add_count(self, subspace_id, count):
|
|
self.total_count[0] += count[0]
|
|
self.total_count[1] += count[1]
|
|
if not subspace_id in self.subspace_count:
|
|
self.subspace_count[subspace_id] = [count[0], count[1]]
|
|
else:
|
|
self.subspace_count[subspace_id][0] += count[0]
|
|
self.subspace_count[subspace_id][1] += count[1]
|
|
|
|
def render_post_entries(self):
|
|
db = self.db
|
|
|
|
# Entries for the user/subspace posts.
|
|
if self.is_user:
|
|
posts = db.get_posts(user=self.user, comment=False, draft=False)
|
|
elif self.ts_range:
|
|
posts = db.get_posts(ts_range=self.ts_range, comment=False, draft=False,
|
|
sort_descending=False)
|
|
else:
|
|
posts = db.get_posts(subspace=self.subspace, comment=False, draft=False)
|
|
|
|
for post in posts:
|
|
self.add_post_entry(post)
|
|
|
|
if self.is_user:
|
|
# Make entries for posts where user has commented in.
|
|
# TODO: Add a proper database query for this.
|
|
commented_in = set()
|
|
for cmt in db.get_posts(user=self.user, comment=True, draft=False,
|
|
sort_descending=False):
|
|
commented_in.add(cmt.parent)
|
|
for post in [db.get_post(id=post_id) for post_id in commented_in]:
|
|
if post and post.user != self.user.id:
|
|
self.add_post_entry(post, is_comment=True)
|
|
|
|
def render_file_entries(self):
|
|
db = self.db
|
|
for file in db.get_user_files(self.user) if self.user \
|
|
else db.get_subspace_files(self.subspace) if self.subspace \
|
|
else db.get_time_files(self.ts_range):
|
|
post = db.get_post(id=file.segment_post)
|
|
filesize = len(file.data)
|
|
entry = GempubArchive.Entry(post,
|
|
file.segment_label + f' [{filesize / 1024:.1f} KB, {file.mimetype}]',
|
|
file.data,
|
|
file)
|
|
self.file_entries.append(entry)
|
|
self.file_index[file.id] = entry
|
|
|
|
def rewrite_internal_urls(self, entry: Entry):
|
|
src = entry.page
|
|
src_post_id = entry.post_id
|
|
|
|
user_pattern = re.compile(r'^=>\s*/u/([\w%-]+)\s')
|
|
if self.subspace:
|
|
post_pattern = re.compile(r'^=>\s*/([us])/' + self.subspace.name + r'/(\d+)\s')
|
|
else:
|
|
post_pattern = re.compile(r'^=>\s*/([us])/[\w%-]+/(\d+)\s')
|
|
file_pattern = re.compile(r'^=>\s*/([us])/[\w%-]+/(image|file)/(\d+)[^ ]*\s')
|
|
root_pattern = re.compile(r'^=>\s*/([^ ]*)\s')
|
|
rewritten = []
|
|
|
|
for line in src.split('\n'):
|
|
m = user_pattern.search(line)
|
|
if m:
|
|
line = f'=> ../../users/{urlparse.unquote(m[1])}.gmi ' + line[m.end():]
|
|
rewritten.append(line)
|
|
continue
|
|
|
|
m = post_pattern.search(line)
|
|
if m:
|
|
post_id = int(m[2])
|
|
if post_id in self.entry_index:
|
|
line = f'=> ../../posts/{self.entry_index[post_id].path()} ' + line[m.end():]
|
|
rewritten.append(line)
|
|
continue
|
|
|
|
m = file_pattern.search(line)
|
|
if m:
|
|
file_id = int(m[3])
|
|
if file_id in self.file_index:
|
|
entry = self.file_index[file_id]
|
|
line = f'=> ../../files/{entry.path()} ' + line[m.end():]
|
|
rewritten.append(line)
|
|
entry.referenced_from_posts.append(src_post_id)
|
|
continue
|
|
|
|
m = root_pattern.search(line)
|
|
if m:
|
|
line = f'=> {self.session.server_root()}/{m[1]} ' + line[m.end():]
|
|
rewritten.append(line)
|
|
continue
|
|
|
|
rewritten.append(line)
|
|
|
|
return '\n'.join(rewritten)
|
|
|
|
def compress(self):
|
|
# Create the ZIP archive.
|
|
buffer = io.BytesIO()
|
|
zip = zipfile.ZipFile(buffer, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=9)
|
|
|
|
def counter_text(count):
|
|
parts = []
|
|
if count[0]:
|
|
parts.append(f'{count[0]} post{plural_s(count[0])}')
|
|
if count[1]:
|
|
parts.append(f'{count[1]} comment{plural_s(count[1])}')
|
|
return ' and '.join(parts)
|
|
|
|
with zip.open('metadata.txt', 'w') as f:
|
|
for entry in self.metadata:
|
|
f.write(f"{entry}: {self.metadata[entry]}\n".encode('utf-8'))
|
|
|
|
with zip.open('title.gmi', 'w') as f:
|
|
f.write(f"""
|
|
|
|
# {self.user.name if self.is_user else self.subspace.name if self.subspace else self.metadata['title']}
|
|
## Gempub Archive
|
|
|
|
|
|
{self.metadata['description']}
|
|
|
|
Exported on {self.metadata['publishDate']}.
|
|
""".encode('utf-8'))
|
|
|
|
# Information about the user/subspace.
|
|
if self.is_user:
|
|
index_page = f'# {self.user.avatar} {self.user.name}\n\nTable of Contents:\n'
|
|
index_page += '\n=> title.gmi Title page\n'
|
|
profile_path = 'users/' + self.user.name + '.gmi'
|
|
index_page += f'=> {profile_path} {self.user.avatar} {self.user.name}\n'
|
|
elif self.subspace:
|
|
index_page = f'# s/{self.subspace.name}\n\nTable of Contents:\n'
|
|
index_page += '\n=> title.gmi Title page\n'
|
|
profile_path = self.subspace.name + '.gmi'
|
|
index_page += f'=> {profile_path} {self.subspace.name}\n'
|
|
with zip.open(profile_path, 'w') as f:
|
|
src = f'# {self.subspace.title()}\n'
|
|
if self.subspace.info:
|
|
src += self.subspace.info + '\n'
|
|
if self.subspace.url:
|
|
src += f'=> {self.subspace.url}\n'
|
|
src += '\nThe subspace was created on ' + \
|
|
make_timestamp(self.subspace.ts_created, '%Y-%m-%d') + '.\n'
|
|
f.write(src.encode('utf-8'))
|
|
else:
|
|
index_page = '# ' + self.metadata['title'] + '\n\nTable of Contents:\n\n'
|
|
|
|
if self.local_entries:
|
|
index_page += f'\n=> posts/index.gmi Posts in {self.subspace.title()}\n'
|
|
local_index_page = f'# Posts in {self.subspace.title()}\n\n'
|
|
for entry in self.local_entries:
|
|
entry_path = 'posts/' + entry.path()
|
|
local_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n'
|
|
with zip.open(entry_path, 'w') as content:
|
|
content.write(self.rewrite_internal_urls(entry).encode('utf-8'))
|
|
with zip.open('posts/index.gmi', 'w') as content:
|
|
content.write(local_index_page.encode('utf-8'))
|
|
|
|
if self.ts_range:
|
|
sub_links = []
|
|
for sub_name in sorted(self.subspace_entries.keys(), key=str.lower):
|
|
first_entry = self.subspace_entries[sub_name][0]
|
|
sub = self.get_subspace(first_entry.subspace_id)
|
|
entry_path = f'{sub.title()[0]}_{sub.name}.gmi'
|
|
sub_links.append(f'=> {entry_path} {sub.title()}\n')
|
|
|
|
title_icon = ''
|
|
if sub.owner:
|
|
title_icon = f'{self.get_user(first_entry.user_id).avatar} '
|
|
sub_page = f'# {title_icon}{sub.title()}\n'
|
|
sub_page += f'{counter_text(self.subspace_count[sub.id])} in this subspace.\n'
|
|
|
|
for entry in self.subspace_entries[sub_name]:
|
|
entry_user = self.get_user(entry.user_id)
|
|
author = f'{entry_user.avatar} {entry_user.name}'
|
|
meta = []
|
|
top = None
|
|
if entry.issueid:
|
|
top = f'[#{entry.issueid}] {entry.title}'
|
|
meta.append(author)
|
|
if entry.tags:
|
|
top += f' · {entry.tags}'
|
|
elif not sub.owner:
|
|
meta.append(author)
|
|
meta.append(entry.dt.strftime('%Y-%m-%d %H:%M'))
|
|
if entry.num_cmts > 0:
|
|
meta.append(f'{entry.num_cmts} comment{plural_s(entry.num_cmts)}')
|
|
if entry.num_likes > 0:
|
|
meta.append(f'{entry.num_likes} like{plural_s(entry.num_likes)}')
|
|
if entry.tags and not entry.issueid:
|
|
meta.append(entry.tags)
|
|
link = f'=> posts/{entry.path()}'
|
|
if top:
|
|
sub_page += f'\n{link} {top}\n{entry.label}\n{" · ".join(meta)}\n'
|
|
else:
|
|
sub_page += f'\n{entry.label}\n{link} {" · ".join(meta)}\n'
|
|
# Write to the archive.
|
|
with zip.open('posts/' + entry.path(), 'w') as content:
|
|
content.write(self.rewrite_internal_urls(entry).encode('utf-8'))
|
|
with zip.open(entry_path, 'w') as content:
|
|
content.write(sub_page.encode('utf-8'))
|
|
|
|
prev_type = None
|
|
for link in sorted(sub_links, key=str.lower):
|
|
if prev_type and prev_type != link[3]:
|
|
index_page += '\n'
|
|
index_page += link
|
|
prev_type = link[3] # u or s
|
|
index_page += '\n'
|
|
|
|
elif self.foreign_entries:
|
|
index_page += f'=> other/index.gmi Posts in Other Subspaces\n'
|
|
foreign_index_page = '# Posts in Other Subspaces\n'
|
|
last_sub = None
|
|
for entry in sorted(self.foreign_entries,
|
|
key=lambda e: self.get_subspace(e.subspace_id).name.lower()):
|
|
entry_sub = self.get_subspace(entry.subspace_id)
|
|
if entry_sub != last_sub:
|
|
foreign_index_page += f'\n## {entry_sub.name}\n'
|
|
last_sub = entry_sub
|
|
entry_path = 'other/' + entry.path()
|
|
foreign_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n'
|
|
with zip.open(entry_path, 'w') as content:
|
|
content.write(self.rewrite_internal_urls(entry).encode('utf-8'))
|
|
with zip.open('other/index.gmi', 'w') as content:
|
|
content.write(foreign_index_page.encode('utf-8'))
|
|
|
|
if self.comment_entries:
|
|
index_page += f'=> comments/index.gmi Commented Posts\n'
|
|
comment_index_page = '# Commented Posts\n'
|
|
for entry in self.comment_entries:
|
|
entry_path = 'comments/' + entry.path()
|
|
comment_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n'
|
|
with zip.open(entry_path, 'w') as content:
|
|
content.write(self.rewrite_internal_urls(entry).encode('utf-8'))
|
|
with zip.open('comments/index.gmi', 'w') as content:
|
|
content.write(comment_index_page.encode('utf-8'))
|
|
|
|
if self.file_entries:
|
|
index_page += '=> files/index.gmi File attachments\n'
|
|
file_index_page = '# File Attachments\n'
|
|
for entry in self.file_entries:
|
|
entry_path = 'files/' + entry.path()
|
|
file_index_page += f'\n=> {entry.path()} {entry.ymd()} {entry.label}\n'
|
|
|
|
# List of posts that link to this file.
|
|
for ref in entry.referenced_from_posts:
|
|
ref_entry = self.entry_index[ref]
|
|
file_index_page += f'=> ../posts/{ref_entry.path()} Referenced in: "{ref_entry.label}"\n'
|
|
|
|
with zip.open(entry_path, 'w') as content:
|
|
content.write(entry.page)
|
|
with zip.open('files/index.gmi', 'w') as content:
|
|
content.write(file_index_page.encode('utf-8'))
|
|
|
|
index_page += '=> users/index.gmi Users\n'
|
|
users_index_page = '# Users\n\nPosts and comments in this archive reference these users:\n\n'
|
|
# Sort users case insensitively.
|
|
for ref, (user, profile_text) in \
|
|
sorted(self.referenced_users.items(), key=lambda u: u[0].lower()):
|
|
users_index_page += f'=> {ref}.gmi {user.avatar} {ref}\n'
|
|
with zip.open('users/' + ref + '.gmi', 'w') as f:
|
|
f.write(profile_text.encode('utf-8'))
|
|
with zip.open('users/index.gmi', 'w') as f:
|
|
f.write(users_index_page.encode('utf-8'))
|
|
|
|
index_page += f'\n=> about/bubble.gmi 💬 About Bubble\n'
|
|
with zip.open('about/bubble.gmi', 'w') as f:
|
|
f.write(self.session.ABOUT.encode('utf-8'))
|
|
|
|
with zip.open('index.gmi', 'w') as f:
|
|
f.write(index_page.encode('utf-8'))
|
|
|
|
zip.close()
|
|
return buffer.getvalue()
|
|
|
|
|
|
def export_gempub_archive(session):
|
|
req = session.req
|
|
db = session.db
|
|
user = session.user
|
|
|
|
if not user:
|
|
return 60, 'Login required'
|
|
|
|
# Determine subspace to export.
|
|
m = re.search(r'/export/(s/|month/)?([\w%-]+)\.gpub$', req.path)
|
|
if not m or not m[2]:
|
|
return 59, 'Bad request'
|
|
name = urlparse.unquote(m[2])
|
|
if m[1] == 'month/':
|
|
month_range = map(int, m[2].split('-'))
|
|
subspace = None
|
|
else:
|
|
month_range = None
|
|
subspace = db.get_subspace(name=name)
|
|
is_user = m[1] is None
|
|
|
|
# Check access rights. At the moment, exporting is only possible via user
|
|
# settings and subspace admin pages, so the user must have moderation
|
|
# rights in the exported subspace.
|
|
if month_range:
|
|
if not user:
|
|
# Have to be logged in.
|
|
return 61, 'Not authorized'
|
|
elif is_user:
|
|
if subspace.owner != user.id:
|
|
return 61, 'Not authorized'
|
|
else:
|
|
if user.id not in map(lambda u: u.id, db.get_mods(subspace)):
|
|
return 61, 'Not authorized'
|
|
|
|
archive = GempubArchive(session, user if is_user else None, subspace, month_range)
|
|
archive.render_post_entries()
|
|
archive.render_file_entries()
|
|
data = archive.compress()
|
|
|
|
return 20, 'application/gpub+zip', data
|