842 lines
34 KiB
842 lines
34 KiB
import datetime
import io
import time
import urllib.parse as urlparse
import zipfile
from model import Search, Segment, User, Subspace, Post
from utils import *
def make_subspace(session):
if not session.user:
return 60, "Login required"
if not session.bubble.user_subspaces and session.user.role == User.BASIC:
return 61, "Not authorized"
db = session.db
req = session.req
if is_empty_query(req):
return 10, "Enter name for new subspace: " + session.NAME_HINT
name = clean_query(req)
if not is_valid_name(name):
return 10, "That is an invalid subspace name. " + session.NAME_HINT
db.create_subspace(name, session.user.id)
return 30, f'/s/{name}/admin'
def make_subspaces_page(session):
page = '# Subspaces\n'
if session.user:
page += session.dashboard_link()
page += '=> / Back to front page\n'
if session.user:
page += f'\n## Moderated by {session.user.name}\n'
subs = session.db.get_subspaces(mod=session.user.id)
if len(subs):
for sub in subs:
page += f'=> /{sub.title()} {sub.title()}\n'
if session.bubble.user_subspaces or session.user.role == User.ADMIN:
page += f'\n=> /new-subspace 🌒 New subspace\n'
page += '\n## Index\n'
page += 'Inactive and unused subspaces will be purged.\n'
subs = session.db.get_subspaces(owner=0)
if len(subs):
for sub in subs:
page += f'=> /{sub.title()} {sub.title()}\n'
page += 'No subspaces.\n'
return page
def subspace_admin_actions(session, action):
if not session.is_user_mod:
return 61, "Moderator rights required"
user = session.user
req = session.req
db = session.db
subspace = session.context
admin_link = f'/{subspace.title()}/admin'
page = f'# {subspace.title()}: Administration\n'
if user.role == User.ADMIN:
m = re.search(r'/repo/(new|delete|clone-url|view-url|idlabel)?(/([0-9a-zA-Z]{10}))?$', req.path)
if m:
if m[1] == 'new':
return 30, f'{admin_link}/repo/'
elif m[1] == 'delete' and m[3]:
if not db.verify_token(user, m[3]):
return 61, 'Not authorized'
if is_empty_query(req):
return 10, 'Really delete repository and commit history? (DELETE to confirm)'
if req.query == 'DELETE':
return 30, f'{admin_link}/repo/'
elif m[1] == 'clone-url':
if req.query is None:
return 10, 'HTTPS URL where to clone repository:'
return 30, f'{admin_link}/repo/'
elif m[1] == 'view-url':
if req.query is None:
return 10, 'Gemini URL for viewing commits:'
return 30, f'{admin_link}/repo/'
elif m[1] == 'idlabel':
if req.query is None:
return 10, 'Label for marking issue IDs in commit messages: (For example, "IssueID")'
return 30, f'{admin_link}/repo/'
page += f'=> {admin_link} Go back\n'
page += '\n## Git Repository\n'
repo = db.get_repository(subspace=subspace)
if not repo:
page += '=> {admin_link}/repo/new 🛢️ New repository\n'
return page
page += f'\n=> {admin_link}/repo/clone-url Clone HTTPS URL: {repo.clone_url if repo.clone_url else "(not set)"}\n'
page += f'=> {admin_link}/repo/idlabel Issue ID label: {repo.idlabel if repo.idlabel else ""}\n'
page += f'\n=> {admin_link}/repo/view-url Commit view Gemini URL: {repo.view_url if repo.view_url else "(not set)"}\n'
# Status information.
page += '\n### Status\n'
if repo.ts_fetch is None:
if not repo.clone_url:
page += 'Not configured.\n'
page += 'Repository will be fetched soon.\n'
n = db.count_commits(repo)
page += f'{n} commits in history. Repository was last fetched on {datetime.datetime.fromtimestamp(repo.ts_fetch, UTC).strftime("%Y-%m-%d at %H:%M:%S %Z")}.\n'
page += f'\n=> {admin_link}/repo/delete/{db.get_token(user)} ❌ Delete repository\n'
return page
if action == 'info':
if req.query == None:
return 10, f"Description for {session.context.title()}:"
db.update_subspace(session.context, info=clean_title(clean_query(req)))
return 30, admin_link
if action == 'url':
if req.query == None:
return 10, f"Featured link for {session.context.title()}: (URL and label)"
link = form_link(parse_link_segment_query(req))
link = ''
db.update_subspace(session.context, url=link)
return 30, admin_link
if action == 'omit-all':
if session.context.flags & Subspace.HIDE_OMIT_SETTING_FLAG:
return 61, 'Not authorized'
db.update_subspace(session.context, flags=session.context.flags ^ Subspace.OMIT_FROM_ALL_FLAG)
return 30, admin_link
# Actions that require link verification.
m = re.search(r'/(delete|tracker|rename|add-mod|remove-mod)/([0-9a-zA-Z]{10})$', req.path)
if m:
token = m[2]
if not db.verify_token(session.user, token):
return 61, 'Not authorized'
if m[1] == 'add-mod':
if is_empty_query(req):
return 10, 'Enter user to add as moderator:'
adding = db.get_user(name=clean_query(req))
if not adding:
return 51, 'Not found'
db.modify_mods(session.context, actor=session.user, add=adding)
return 30, admin_link
if m[1] == 'remove-mod':
if is_empty_query(req):
return 10, 'Enter user to remove as moderator:'
removing = db.get_user(name=clean_query(req))
if not removing:
return 51, 'Not found'
db.modify_mods(session.context, actor=session.user, remove=removing)
return 30, admin_link
if m[1] == 'rename':
prompt = f'Enter new name for {session.context.name}? (Warning: Links to subspace will break!)'
if is_empty_query(req):
return 10, prompt
new_name = clean_query(req)
if not is_valid_name(new_name):
return 10, prompt
db.update_subspace(session.context, name=new_name)
return 10, prompt
return 30, f'/s/{new_name}/admin'
if m[1] == 'delete' and session.user.role == User.ADMIN:
if is_empty_query(req):
return 10, f'Really delete {session.context.title()}? (Enter DELETE to confirm.)'
if req.query == 'DELETE':
return 30, f'/admin/delete-subspace/{token}?{session.context.name}'
return 30, admin_link
elif m[1] == 'tracker' and (session.user.role == User.ADMIN or
new_flags = session.context.flags ^ Subspace.ISSUE_TRACKER
if new_flags & Subspace.ISSUE_TRACKER:
# Issues shouldn't be listed in All Posts.
new_flags = new_flags | Subspace.OMIT_FROM_ALL_FLAG
db.update_subspace(session.context, flags=new_flags)
return 30, admin_link
page += session.context.subspace_link()
if not session.context.flags & (Subspace.ISSUE_TRACKER | Subspace.HIDE_OMIT_SETTING_FLAG):
page += f'\n=> {admin_link}/omit-all {session.CHECKS[session.context.flags & Subspace.OMIT_FROM_ALL_FLAG]} Omit from All Posts\n'
page += '\n## About\n'
page += '\n### Description\n'
page += (session.context.info if session.context.info else '(no description)') + '\n'
page += f'=> {admin_link}/info ✏️ Edit\n'
page += '\n### Featured Link\n'
page += (f'=> {session.context.url}' if session.context.url else '(no featured link)') + '\n'
page += f'=> {admin_link}/url ✏️ Edit\n'
page += '\n## Moderators\n\n'
mods = db.get_mods(session.context)
for mod in mods:
page += f'=> /u/{mod.name} {mod.avatar} {mod.name}\n'
page += f'=> {admin_link}/add-mod/{session.get_token()} Add moderator\n'
if len(mods) > 1:
page += f'=> {admin_link}/remove-mod/{session.get_token()} Remove moderator\n'
if session.user.role == User.ADMIN or session.db.is_empty_subspace(session.context):
page += '\n## Issue Tracking\n'
page += f'\n=> {admin_link}/tracker/{db.get_token(session.user)} {session.CHECKS[nonzero(session.context.flags & Subspace.ISSUE_TRACKER)]} Subspace is an issue tracker\n'
page += 'Posts in an issue tracker are designated issue IDs and have an Open/Closed status. Issues may refer to Git repository commits via hash, and commit messages can refer to issues by ID. Non-issue posts are not allowed in an issue tracker subspace.\n'
if session.user.role == User.ADMIN:
page += f'\n=> {admin_link}/repo/ ⚙️ Git repository settings\n'
page += '\n## Actions\n'
page += '\n=> /export/' + session.context.title() + '.gpub 📤 Export data archive\n'
page += f'Download a ZIP archive containing all posts and comments in {session.context.title()}. The archive has Gempub metadata so it can also be viewed in a Gempub reader.\n'
page += f'\n=> {admin_link}/rename/{session.get_token()} Rename subspace\n'
page += 'Links pointing to the subspace will break when the name is changed.\n'
if session.user.role == User.ADMIN:
page += f'\n=> {admin_link}/delete/{session.get_token()} ⚠️ Delete subspace {session.context.title()}\n'
page += 'All posts and comments in the subspace will be deleted. Exporting a backup beforehand is recommended.\n'
return page
def split_terms(text):
import shlex
return list(filter(lambda t: len(t) >= 2, map(str.strip,
shlex.split(text.replace("'", "\\'")))))
def make_search_page(session):
req = session.req
db = session.db
user = session.user
LIMIT = 30
m = re.match(r'(/([us])/([\w%-]+))?/search(/(\d+))?', req.path)
if not m:
return 59, 'Bad request'
if m[2] or m[3]:
ident = urlparse.unquote(m[3])
scope = db.get_subspace(name=ident)
if m[2] == 'u' and not scope.owner:
return 51, 'Not found'
if not scope:
return 51, 'Not found'
scope = None
page_index = max(0, int(m[5])) if m[5] else 0
if req.query is None:
return 10, f'Search {"in " + scope.title() if scope else session.bubble.site_name}:'
search_url = ('/' if not scope else f'/{scope.title()}/') + 'search'
terms = split_terms(clean_query(req))
if scope:
page = f'# Search in {scope.title()}\n'
page = '# Search\n'
page += f'=> {search_url} 🔍 New search\n'
if scope:
page += f'=> /{scope.title()} Back to {scope.title()}\n'
page += '=> / 🌒 Back to front page\n'
if terms:
page += '\n## ' + ' '.join(terms) + '\n'
# Perform the search.
search = Search(db)
count = search.run(terms, scope, limit=LIMIT, page_index=page_index)
# TODO: Just counting the matches without returning anything might be
# a useful addition in `model.Search`.
#page += f'Found {count} match{plural_s(count, "es")}.\n'
if page_index > 0:
page += f'\n=> {search_url}/{page_index - 1}?{req.query} Previous page\n'
if count == 0:
page += 'Found nothing matching the search terms.\n' if page_index == 0 else \
'No more results.\n'
for result in search.results:
page += '\n'
#ts = result[0]
obj = result[1]
if isinstance(obj, User):
page += f'=> /u/{obj.name} {obj.avatar} u/{obj.name}\n'
if obj.info:
page += f'{obj.info[:300].strip()}\n'
elif isinstance(obj, Subspace):
page += f'=> /s/{obj.name} s/{obj.name}\n'
if obj.info:
page += f'{obj.info[:300].strip()}\n'
elif isinstance(obj, Post):
ctx = ("u/" if obj.sub_owner else "s/") + obj.sub_name
kind = "Comment" if obj.parent else f"Issue #{obj.issueid}" if obj.issueid else "Post"
title = f' "{shorten_text(obj.title, 30)}"' if obj.title else ''
scope_desc = f"in {ctx} " if not scope and not obj.sub_owner else ""
page += f'=> /{ctx}/{obj.issueid if obj.issueid else obj.id} {kind}{title} {scope_desc}by {obj.poster_avatar} {obj.poster_name} on {obj.ymd_date(tz=session.tz)} {" · " if obj.tags else ""}{obj.tags}\n'
SEGTYPES = ['content', 'URL', 'image', 'attachment', 'poll option']
if result[2] != Segment.TEXT:
page += f'(matching {SEGTYPES[result[2]]}) '
page += obj.summary.replace('\n', ' ').replace('=>', ' ').strip() + '\n'
if count >= LIMIT:
page += f'\n=> {search_url}/{page_index + 1}?{req.query} Next page\nPage {page_index + 1}\n'
return page
def listed_items(items):
if len(items) == 0:
return ''
if len(items) == 1:
return items[0]
return ', '.join(items[0:-1]) + ' and ' + items[-1]
def make_timestamp(ts, fmt="%Y-%m-%d at %H:%M"):
return datetime.datetime.fromtimestamp(ts, UTC).strftime(fmt)
class GempubArchive:
class Entry:
def __init__(self, post, label, page, file=None):
self.ts = post.ts_created
self.dt = datetime.datetime.fromtimestamp(self.ts, UTC)
self.post_id = post.id
self.issueid = post.issueid
self.title = post.title
self.subspace_id = post.subspace
self.user_id = post.user
self.label = label
self.page = page
self.file = file
self.tags = post.tags
self.num_cmts = post.num_cmts
self.num_likes = post.num_likes
self.referenced_from_posts = []
def ymd(self):
return self.dt.strftime('%Y-%m-%d')
def path(self):
if self.file:
pos = self.file.segment_url.rfind('/') + 1
return f'file{self.file.id}_{self.file.segment_url[pos:]}'
fn = re.sub(r'[^\w\d-]', '', self.title.replace(' ', '-')).lower().strip() # clean it up
if len(fn) > 0:
fn = '_' + fn
#if len(fn) == 0:
# fn = f'{self.dt.day}_post{self.post_id}.gmi'
return f'{self.dt.year:04d}-{self.dt.month:02d}/{self.post_id}{fn}.gmi'
def __init__(self, session, user=None, subspace=None, month_range=None):
self.session = session
self.db = session.db
self.ts_range = None
if month_range:
year, month = month_range
end_month = month + 1 if month < 12 else 1
end_year = year if month < 12 else year + 1
self.ts_range = (
datetime.datetime(year, month, 1, 0, 0, 0, tzinfo=UTC).timestamp(),
datetime.datetime(end_year, end_month, 1, 0, 0, 0, tzinfo=UTC).timestamp()
self.user = user
self.subspace = subspace
self.is_user = self.ts_range is None and subspace.owner != 0
assert self.is_user and self.user or not self.is_user and not self.user
assert self.ts_range or self.subspace is not None
# Modify settion so rendered pages appear to be not logged in.
session.is_archive = True
session.user = None
self.site_link = session.server_root()
if month_range:
archive_title = f'{datetime.datetime(year, month, 1).strftime("%B %Y")}'
archive_description = f'All posts and comments made on {session.bubble.site_name}. '
archive_title = f'{"s/" if not self.is_user else ""}{subspace.name} on {session.bubble.site_name}'
archive_description = \
(f'All posts and comments made in the subspace {subspace.title()} on {session.bubble.site_name}. ' if not self.is_user else f'All posts and comments made by {user.name} on {session.bubble.site_name}. ')
self.metadata = {
'gpubVersion': '1.0.0',
'title': archive_title,
'description': archive_description,
'author': f'Bubble v{session.bubble.version}',
'publishDate': time.strftime('%Y-%m-%d'),
'index': 'index.gmi'
self.local_entries = [] # posts in the archive's subspace
self.foreign_entries = [] # posts in other subspaces
self.subspace_entries = {} # subspace name => list of entries
self.comment_entries = [] # posts where user has commented
self.file_entries = [] # files
self.entry_index = {} # indexed by post ID
self.file_index = {} # indexed by file ID
self.referenced_users = {} # info about posters
self.total_count = [0, 0]
self.subspace_count = {} # [posts, comments]
self.subspaces = {}
self.users = {}
if self.is_user:
self.users[self.user.id] = user
def user_page(self, user):
src = f'# {user.avatar} {user.name}\n'
if user.info:
src += user.info + '\n'
if user.url:
src += f'=> {user.url}\n'
src += f'\n\n=> {self.site_link}/u/{user.name} {user.name} on {self.session.bubble.site_name}\n'
src += 'The account was created on ' + \
make_timestamp(user.ts_created, '%Y-%m-%d') + '.\n'
return src
def get_subspace(self, id):
if id not in self.subspaces:
self.subspaces[id] = self.db.get_subspace(id=id)
return self.subspaces[id]
def get_user(self, id):
if id not in self.users:
self.users[id] = self.db.get_user(id=id)
return self.users[id]
def add_user_page(self, user):
if not user.name in self.referenced_users:
self.referenced_users[user.name] = (user, self.user_page(user))
def add_post_entry(self, post, is_comment=False):
from feeds import make_post_page
# Modify session according to the post's subspace.
self.session.context = self.get_subspace(post.subspace)
self.session.is_context_tracker = (self.session.context.flags & Subspace.ISSUE_TRACKER) != 0
is_local = (post.subspace == self.subspace.id) if self.subspace else False
if not self.ts_range:
where = self.session.context.title() if not is_local and (
not self.is_user or is_comment) else None
label_sub = ' · ' + where if where else ''
page = make_post_page(self.session, post)
if self.ts_range:
label = shorten_text(clean_title(strip_links(post.summary)), 150)
label = (post.title if post.title else shorten_text(clean_title(strip_links(post.summary)), 100)) + label_sub
entry = GempubArchive.Entry(post, label, page)
# Check for referenced users.
for username in re.findall(r'=> /u/([\w-]+)\s', page):
ref = self.db.get_user(name=username)
if ref:
if is_comment:
elif is_local:
skey = self.session.context.name
if skey in self.subspace_entries:
self.subspace_entries[skey] = [entry]
if not post.id in self.entry_index:
if not is_comment:
(1, self.db.count_posts(parent_id=post.id, draft=False)))
self.entry_index[post.id] = entry
def add_count(self, subspace_id, count):
self.total_count[0] += count[0]
self.total_count[1] += count[1]
if not subspace_id in self.subspace_count:
self.subspace_count[subspace_id] = [count[0], count[1]]
self.subspace_count[subspace_id][0] += count[0]
self.subspace_count[subspace_id][1] += count[1]
def render_post_entries(self):
db = self.db
# Entries for the user/subspace posts.
if self.is_user:
posts = db.get_posts(user=self.user, comment=False, draft=False)
elif self.ts_range:
posts = db.get_posts(ts_range=self.ts_range, comment=False, draft=False,
posts = db.get_posts(subspace=self.subspace, comment=False, draft=False)
for post in posts:
if self.is_user:
# Make entries for posts where user has commented in.
# TODO: Add a proper database query for this.
commented_in = set()
for cmt in db.get_posts(user=self.user, comment=True, draft=False,
for post in [db.get_post(id=post_id) for post_id in commented_in]:
if post and post.user != self.user.id:
self.add_post_entry(post, is_comment=True)
def render_file_entries(self):
db = self.db
for file in db.get_user_files(self.user) if self.user \
else db.get_subspace_files(self.subspace) if self.subspace \
else db.get_time_files(self.ts_range):
post = db.get_post(id=file.segment_post)
filesize = len(file.data)
entry = GempubArchive.Entry(post,
file.segment_label + f' [{filesize / 1024:.1f} KB, {file.mimetype}]',
self.file_index[file.id] = entry
def rewrite_internal_urls(self, entry: Entry):
src = entry.page
src_post_id = entry.post_id
user_pattern = re.compile(r'^=>\s*/u/([\w%-]+)\s')
if self.subspace:
post_pattern = re.compile(r'^=>\s*/([us])/' + self.subspace.name + r'/(\d+)\s')
post_pattern = re.compile(r'^=>\s*/([us])/[\w%-]+/(\d+)\s')
file_pattern = re.compile(r'^=>\s*/([us])/[\w%-]+/(image|file)/(\d+)[^ ]*\s')
root_pattern = re.compile(r'^=>\s*/([^ ]*)\s')
rewritten = []
for line in src.split('\n'):
m = user_pattern.search(line)
if m:
line = f'=> ../../users/{urlparse.unquote(m[1])}.gmi ' + line[m.end():]
m = post_pattern.search(line)
if m:
post_id = int(m[2])
if post_id in self.entry_index:
line = f'=> ../../posts/{self.entry_index[post_id].path()} ' + line[m.end():]
m = file_pattern.search(line)
if m:
file_id = int(m[3])
if file_id in self.file_index:
entry = self.file_index[file_id]
line = f'=> ../../files/{entry.path()} ' + line[m.end():]
m = root_pattern.search(line)
if m:
line = f'=> {self.session.server_root()}/{m[1]} ' + line[m.end():]
return '\n'.join(rewritten)
def compress(self):
# Create the ZIP archive.
buffer = io.BytesIO()
zip = zipfile.ZipFile(buffer, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=9)
def counter_text(count):
parts = []
if count[0]:
parts.append(f'{count[0]} post{plural_s(count[0])}')
if count[1]:
parts.append(f'{count[1]} comment{plural_s(count[1])}')
return ' and '.join(parts)
with zip.open('metadata.txt', 'w') as f:
for entry in self.metadata:
f.write(f"{entry}: {self.metadata[entry]}\n".encode('utf-8'))
with zip.open('title.gmi', 'w') as f:
# {self.user.name if self.is_user else self.subspace.name if self.subspace else self.metadata['title']}
## Gempub Archive
Exported on {self.metadata['publishDate']}.
# Information about the user/subspace.
if self.is_user:
index_page = f'# {self.user.avatar} {self.user.name}\n\nTable of Contents:\n'
index_page += '\n=> title.gmi Title page\n'
profile_path = 'users/' + self.user.name + '.gmi'
index_page += f'=> {profile_path} {self.user.avatar} {self.user.name}\n'
elif self.subspace:
index_page = f'# s/{self.subspace.name}\n\nTable of Contents:\n'
index_page += '\n=> title.gmi Title page\n'
profile_path = self.subspace.name + '.gmi'
index_page += f'=> {profile_path} {self.subspace.name}\n'
with zip.open(profile_path, 'w') as f:
src = f'# {self.subspace.title()}\n'
if self.subspace.info:
src += self.subspace.info + '\n'
if self.subspace.url:
src += f'=> {self.subspace.url}\n'
src += '\nThe subspace was created on ' + \
make_timestamp(self.subspace.ts_created, '%Y-%m-%d') + '.\n'
index_page = '# ' + self.metadata['title'] + '\n\nTable of Contents:\n\n'
if self.local_entries:
index_page += f'\n=> posts/index.gmi Posts in {self.subspace.title()}\n'
local_index_page = f'# Posts in {self.subspace.title()}\n\n'
for entry in self.local_entries:
entry_path = 'posts/' + entry.path()
local_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n'
with zip.open(entry_path, 'w') as content:
with zip.open('posts/index.gmi', 'w') as content:
if self.ts_range:
sub_links = []
for sub_name in sorted(self.subspace_entries.keys(), key=str.lower):
first_entry = self.subspace_entries[sub_name][0]
sub = self.get_subspace(first_entry.subspace_id)
entry_path = f'{sub.title()[0]}_{sub.name}.gmi'
sub_links.append(f'=> {entry_path} {sub.title()}\n')
title_icon = ''
if sub.owner:
title_icon = f'{self.get_user(first_entry.user_id).avatar} '
sub_page = f'# {title_icon}{sub.title()}\n'
sub_page += f'{counter_text(self.subspace_count[sub.id])} in this subspace.\n'
for entry in self.subspace_entries[sub_name]:
entry_user = self.get_user(entry.user_id)
author = f'{entry_user.avatar} {entry_user.name}'
meta = []
top = None
if entry.issueid:
top = f'[#{entry.issueid}] {entry.title}'
if entry.tags:
top += f' · {entry.tags}'
elif not sub.owner:
meta.append(entry.dt.strftime('%Y-%m-%d %H:%M'))
if entry.num_cmts > 0:
meta.append(f'{entry.num_cmts} comment{plural_s(entry.num_cmts)}')
if entry.num_likes > 0:
meta.append(f'{entry.num_likes} like{plural_s(entry.num_likes)}')
if entry.tags and not entry.issueid:
link = f'=> posts/{entry.path()}'
if top:
sub_page += f'\n{link} {top}\n{entry.label}\n{" · ".join(meta)}\n'
sub_page += f'\n{entry.label}\n{link} {" · ".join(meta)}\n'
# Write to the archive.
with zip.open('posts/' + entry.path(), 'w') as content:
with zip.open(entry_path, 'w') as content:
prev_type = None
for link in sorted(sub_links, key=str.lower):
if prev_type and prev_type != link[3]:
index_page += '\n'
index_page += link
prev_type = link[3] # u or s
index_page += '\n'
elif self.foreign_entries:
index_page += f'=> other/index.gmi Posts in Other Subspaces\n'
foreign_index_page = '# Posts in Other Subspaces\n'
last_sub = None
for entry in sorted(self.foreign_entries,
key=lambda e: self.get_subspace(e.subspace_id).name.lower()):
entry_sub = self.get_subspace(entry.subspace_id)
if entry_sub != last_sub:
foreign_index_page += f'\n## {entry_sub.name}\n'
last_sub = entry_sub
entry_path = 'other/' + entry.path()
foreign_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n'
with zip.open(entry_path, 'w') as content:
with zip.open('other/index.gmi', 'w') as content:
if self.comment_entries:
index_page += f'=> comments/index.gmi Commented Posts\n'
comment_index_page = '# Commented Posts\n'
for entry in self.comment_entries:
entry_path = 'comments/' + entry.path()
comment_index_page += f'=> {entry.path()} {entry.ymd()} {entry.label}\n'
with zip.open(entry_path, 'w') as content:
with zip.open('comments/index.gmi', 'w') as content:
if self.file_entries:
index_page += '=> files/index.gmi File attachments\n'
file_index_page = '# File Attachments\n'
for entry in self.file_entries:
entry_path = 'files/' + entry.path()
file_index_page += f'\n=> {entry.path()} {entry.ymd()} {entry.label}\n'
# List of posts that link to this file.
for ref in entry.referenced_from_posts:
ref_entry = self.entry_index[ref]
file_index_page += f'=> ../posts/{ref_entry.path()} Referenced in: "{ref_entry.label}"\n'
with zip.open(entry_path, 'w') as content:
with zip.open('files/index.gmi', 'w') as content:
index_page += '=> users/index.gmi Users\n'
users_index_page = '# Users\n\nPosts and comments in this archive reference these users:\n\n'
# Sort users case insensitively.
for ref, (user, profile_text) in \
sorted(self.referenced_users.items(), key=lambda u: u[0].lower()):
users_index_page += f'=> {ref}.gmi {user.avatar} {ref}\n'
with zip.open('users/' + ref + '.gmi', 'w') as f:
with zip.open('users/index.gmi', 'w') as f:
index_page += f'\n=> about/bubble.gmi 💬 About Bubble\n'
with zip.open('about/bubble.gmi', 'w') as f:
with zip.open('index.gmi', 'w') as f:
return buffer.getvalue()
def export_gempub_archive(session):
req = session.req
db = session.db
user = session.user
if not user:
return 60, 'Login required'
# Determine subspace to export.
m = re.search(r'/export/(s/|month/)?([\w%-]+)\.gpub$', req.path)
if not m or not m[2]:
return 59, 'Bad request'
name = urlparse.unquote(m[2])
if m[1] == 'month/':
month_range = map(int, m[2].split('-'))
subspace = None
month_range = None
subspace = db.get_subspace(name=name)
is_user = m[1] is None
# Check access rights. At the moment, exporting is only possible via user
# settings and subspace admin pages, so the user must have moderation
# rights in the exported subspace.
if month_range:
if not user:
# Have to be logged in.
return 61, 'Not authorized'
elif is_user:
if subspace.owner != user.id:
return 61, 'Not authorized'
if user.id not in map(lambda u: u.id, db.get_mods(subspace)):
return 61, 'Not authorized'
archive = GempubArchive(session, user if is_user else None, subspace, month_range)
data = archive.compress()
return 20, 'application/gpub+zip', data