Remove the old web package.

This commit is contained in:
Georg Brandl 2008-06-05 07:41:22 +00:00
parent 7e80f60412
commit d4185e6f80
20 changed files with 0 additions and 3292 deletions

View File

@ -1,62 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web
~~~~~~~~~~
A web application to serve the Python docs interactively.
:copyright: 2007-2008 by Georg Brandl.
:license: BSD.
"""
import os
import sys
import getopt
import sphinx
from sphinx.web.application import setup_app
from sphinx.web.serve import run_simple
try:
from werkzeug.debug import DebuggedApplication
except ImportError:
DebuggedApplication = lambda x, y: x
def main(argv=sys.argv):
opts, args = getopt.getopt(argv[1:], "dhf:")
opts = dict(opts)
if len(args) != 1 or '-h' in opts:
print 'usage: %s [-d] [-f cfg.py] <doc_root>' % argv[0]
print ' -d: debug mode, use werkzeug debugger if installed'
print ' -f: use "cfg.py" file instead of doc_root/webconf.py'
return 2
conffile = opts.get('-f', os.path.join(args[0], 'webconf.py'))
config = {}
execfile(conffile, config)
port = config.get('listen_port', 3000)
hostname = config.get('listen_addr', 'localhost')
debug = ('-d' in opts) or (hostname == 'localhost')
config['data_root_path'] = args[0]
config['debug'] = debug
def make_app():
app = setup_app(config, check_superuser=True)
if debug:
app = DebuggedApplication(app, True)
return app
if os.environ.get('RUN_MAIN') != 'true':
print '* Sphinx %s- Python documentation web application' % \
sphinx.__version__.replace('$', '').replace('Revision:', 'rev.')
if debug:
print '* Running in debug mode'
run_simple(hostname, port, make_app, use_reloader=debug)
if __name__ == '__main__':
sys.exit(main(sys.argv))

View File

@ -1,258 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.admin
~~~~~~~~~~~~~~~~
Admin application parts.
:copyright: 2007-2008 by Georg Brandl, Armin Ronacher.
:license: BSD.
"""
from sphinx.web.util import render_template
from sphinx.web.wsgiutil import Response, RedirectResponse, NotFound
from sphinx.web.database import Comment
class AdminPanel(object):
"""
Provide the admin functionallity.
"""
def __init__(self, app):
self.app = app
self.env = app.env
self.userdb = app.userdb
def dispatch(self, req, page):
"""
Dispatch the requests for the current user in the admin panel.
"""
is_logged_in = req.user is not None
if is_logged_in:
privileges = self.userdb.privileges[req.user]
is_master_admin = 'master' in privileges
can_change_password = 'frozenpassword' not in privileges
else:
privileges = set()
can_change_password = is_master_admin = False
# login and logout
if page == 'login':
return self.do_login(req)
elif not is_logged_in:
return RedirectResponse('@admin/login/')
elif page == 'logout':
return self.do_logout(req)
# account maintance
elif page == 'change_password' and can_change_password:
return self.do_change_password(req)
elif page == 'manage_users' and is_master_admin:
return self.do_manage_users(req)
# moderate comments
elif page.split('/')[0] == 'moderate_comments':
return self.do_moderate_comments(req, page[18:])
# missing page
elif page != '':
raise NotFound()
return Response(render_template(req, 'admin/index.html', {
'is_master_admin': is_master_admin,
'can_change_password': can_change_password
}))
def do_login(self, req):
"""
Display login form and do the login procedure.
"""
if req.user is not None:
return RedirectResponse('@admin/')
login_failed = False
if req.method == 'POST':
if req.form.get('cancel'):
return RedirectResponse('')
username = req.form.get('username')
password = req.form.get('password')
if self.userdb.check_password(username, password):
req.login(username)
return RedirectResponse('@admin/')
login_failed = True
return Response(render_template(req, 'admin/login.html', {
'login_failed': login_failed
}))
def do_logout(self, req):
"""
Log the user out.
"""
req.logout()
return RedirectResponse('@admin/login/')
def do_change_password(self, req):
"""
Allows the user to change his password.
"""
change_failed = change_successful = False
if req.method == 'POST':
if req.form.get('cancel'):
return RedirectResponse('@admin/')
pw = req.form.get('pw1')
if pw and pw == req.form.get('pw2'):
self.userdb.set_password(req.user, pw)
self.userdb.save()
change_successful = True
else:
change_failed = True
return Response(render_template(req, 'admin/change_password.html', {
'change_failed': change_failed,
'change_successful': change_successful
}))
def do_manage_users(self, req):
"""
Manage other user accounts. Requires master privileges.
"""
add_user_mode = False
user_privileges = {}
users = sorted((user, []) for user in self.userdb.users)
to_delete = set()
generated_user = generated_password = None
user_exists = False
if req.method == 'POST':
for item in req.form.getlist('delete'):
try:
to_delete.add(item)
except ValueError:
pass
for name, item in req.form.iteritems():
if name.startswith('privileges-'):
user_privileges[name[11:]] = [x.strip() for x
in item.split(',')]
if req.form.get('cancel'):
return RedirectResponse('@admin/')
elif req.form.get('add_user'):
username = req.form.get('username')
if username:
if username in self.userdb.users:
user_exists = username
else:
generated_password = self.userdb.add_user(username)
self.userdb.save()
generated_user = username
else:
add_user_mode = True
elif req.form.get('aborted'):
return RedirectResponse('@admin/manage_users/')
users = {}
for user in self.userdb.users:
if user not in user_privileges:
users[user] = sorted(self.userdb.privileges[user])
else:
users[user] = user_privileges[user]
new_users = users.copy()
for user in to_delete:
new_users.pop(user, None)
self_destruction = req.user not in new_users or \
'master' not in new_users[req.user]
if req.method == 'POST' and (not to_delete or
(to_delete and req.form.get('confirmed'))) and \
req.form.get('update'):
old_users = self.userdb.users.copy()
for user in old_users:
if user not in new_users:
del self.userdb.users[user]
else:
self.userdb.privileges[user].clear()
self.userdb.privileges[user].update(new_users[user])
self.userdb.save()
return RedirectResponse('@admin/manage_users/')
return Response(render_template(req, 'admin/manage_users.html', {
'users': users,
'add_user_mode': add_user_mode,
'to_delete': to_delete,
'ask_confirmation': req.method == 'POST' and to_delete \
and not self_destruction,
'generated_user': generated_user,
'generated_password': generated_password,
'self_destruction': self_destruction,
'user_exists': user_exists
}))
def do_moderate_comments(self, req, url):
"""
Comment moderation panel.
"""
if url == 'recent_comments':
details_for = None
recent_comments = Comment.get_recent(20)
else:
details_for = url and self.env.get_real_filename(url) or None
recent_comments = None
to_delete = set()
edit_detail = None
if 'edit' in req.args:
try:
edit_detail = Comment.get(int(req.args['edit']))
except ValueError:
pass
if req.method == 'POST':
for item in req.form.getlist('delete'):
try:
to_delete.add(int(item))
except ValueError:
pass
if req.form.get('cancel'):
return RedirectResponse('@admin/')
elif req.form.get('confirmed'):
for comment_id in to_delete:
try:
Comment.get(comment_id).delete()
except ValueError:
pass
return RedirectResponse(req.path)
elif req.form.get('aborted'):
return RedirectResponse(req.path)
elif req.form.get('edit') and not to_delete:
if 'delete_this' in req.form:
try:
to_delete.add(req.form['delete_this'])
except ValueError:
pass
else:
try:
edit_detail = c = Comment.get(int(req.args['edit']))
except ValueError:
pass
else:
if req.form.get('view'):
return RedirectResponse(c.url)
c.author = req.form.get('author', '')
c.author_mail = req.form.get('author_mail', '')
c.title = req.form.get('title', '')
c.comment_body = req.form.get('comment_body', '')
c.save()
self.app.cache.pop(edit_detail.associated_page, None)
return RedirectResponse(req.path)
return Response(render_template(req, 'admin/moderate_comments.html', {
'pages_with_comments': [{
'page_id': page_id,
'title': page_id, #XXX: get title somehow
'has_details': details_for == page_id,
'comments': comments
} for page_id, comments in Comment.get_overview(details_for)],
'recent_comments': recent_comments,
'to_delete': to_delete,
'ask_confirmation': req.method == 'POST' and to_delete,
'edit_detail': edit_detail
}))

View File

@ -1,69 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.antispam
~~~~~~~~~~~~~~~~~~~
Small module that performs anti spam tests based on the bad content
regex list provided by moin moin.
:copyright: 2007-2008 by Armin Ronacher.
:license: BSD.
"""
import re
import urllib
import time
from os import path
DOWNLOAD_URL = 'http://moinmaster.wikiwikiweb.de/BadContent?action=raw'
UPDATE_INTERVAL = 60 * 60 * 24 * 7
class AntiSpam(object):
"""
Class that reads a bad content database (flat file that is automatically
updated from the moin moin server) and checks strings against it.
"""
def __init__(self, bad_content_file):
self.bad_content_file = bad_content_file
lines = None
if not path.exists(self.bad_content_file):
last_change = 0
else:
last_change = path.getmtime(self.bad_content_file)
if last_change + UPDATE_INTERVAL < time.time():
try:
f = urllib.urlopen(DOWNLOAD_URL)
data = f.read()
except:
pass
else:
lines = [l.strip() for l in data.splitlines()
if not l.startswith('#')]
f = open(bad_content_file, 'w')
try:
f.write('\n'.join(lines))
finally:
f.close()
last_change = int(time.time())
if lines is None:
try:
f = open(bad_content_file)
try:
lines = [l.strip() for l in f]
finally:
f.close()
except:
lines = []
self.rules = [re.compile(rule) for rule in lines if rule]
def is_spam(self, fields):
for regex in self.rules:
for field in fields:
if regex.search(field) is not None:
return True
return False

View File

@ -1,826 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.application
~~~~~~~~~~~~~~~~~~~~~~
A simple WSGI application that serves an interactive version
of the python documentation.
:copyright: 2007-2008 by Georg Brandl, Armin Ronacher.
:license: BSD.
"""
import os
import re
import copy
import time
import heapq
import math
import difflib
import tempfile
import threading
import cPickle as pickle
import cStringIO as StringIO
from os import path
from itertools import groupby
from sphinx.web.feed import Feed
from sphinx.web.mail import Email
from sphinx.web.util import render_template, get_target_uri, blackhole_dict, striptags
from sphinx.web.admin import AdminPanel
from sphinx.web.userdb import UserDatabase
from sphinx.web.robots import robots_txt
from sphinx.web.oldurls import handle_html_url
from sphinx.web.antispam import AntiSpam
from sphinx.web.database import connect, set_connection, Comment
from sphinx.web.wsgiutil import Request, Response, RedirectResponse, \
JSONResponse, SharedDataMiddleware, NotFound, get_base_uri
from sphinx.util import relative_uri
from sphinx.search import SearchFrontend
from sphinx.htmlwriter import HTMLWriter
from sphinx.builder import LAST_BUILD_FILENAME, ENV_PICKLE_FILENAME
from docutils.io import StringOutput
from docutils.utils import Reporter
from docutils.frontend import OptionParser
_mail_re = re.compile(r'^([a-zA-Z0-9_\.\-])+\@'
r'(([a-zA-Z0-9\-])+\.)+([a-zA-Z0-9]{2,})+$')
env_lock = threading.Lock()
PATCH_MESSAGE = '''\
A new documentation patch has been submitted.
Author: %(author)s <%(email)s>
Date: %(asctime)s
Page: %(page_id)s
Summary: %(summary)s
'''
known_designs = {
'default': (['default.css', 'pygments.css'],
'The default design, with the sidebar on the left side.'),
'rightsidebar': (['default.css', 'rightsidebar.css', 'pygments.css'],
'Display the sidebar on the right side.'),
'stickysidebar': (['default.css', 'stickysidebar.css', 'pygments.css'],
'''\
Display the sidebar on the left and don\'t scroll it
with the content. This can cause parts of the content to
become inaccessible when the table of contents is too long.'''),
'traditional': (['traditional.css'],
'''\
A design similar to the old documentation style.'''),
}
comments_methods = {
'inline': 'Show all comments inline.',
'bottom': 'Show all comments at the page bottom.',
'none': 'Don\'t show comments at all.',
}
class MockBuilder(object):
def get_relative_uri(self, from_, to):
return ''
name = 'web'
NoCache = object()
def cached(inner):
"""
Response caching system.
"""
def caching_function(self, *args, **kwds):
gen = inner(self, *args, **kwds)
cache_id = gen.next()
if cache_id is NoCache:
response = gen.next()
gen.close()
# this could also return a RedirectResponse...
if isinstance(response, Response):
return response
else:
return Response(response)
try:
text = self.cache[cache_id]
gen.close()
except KeyError:
text = gen.next()
self.cache[cache_id] = text
return Response(text)
return caching_function
class DocumentationApplication(object):
"""
Serves the documentation.
"""
def __init__(self, config):
if config['debug']:
self.cache = blackhole_dict()
else:
self.cache = {}
self.freqmodules = {}
self.last_most_frequent = []
self.generated_stylesheets = {}
self.config = config
self.data_root = config['data_root_path']
self.buildfile = path.join(self.data_root, LAST_BUILD_FILENAME)
self.buildmtime = -1
self.load_env(0)
self.db_con = connect(path.join(self.data_root, 'sphinx.db'))
self.antispam = AntiSpam(path.join(self.data_root, 'bad_content'))
self.userdb = UserDatabase(path.join(self.data_root, 'docusers'))
self.admin_panel = AdminPanel(self)
def load_env(self, new_mtime):
env_lock.acquire()
try:
if self.buildmtime == new_mtime:
# happens if another thread already reloaded the env
return
print "* Loading the environment..."
f = open(path.join(self.data_root, ENV_PICKLE_FILENAME), 'rb')
try:
self.env = pickle.load(f)
finally:
f.close()
f = open(path.join(self.data_root, 'globalcontext.pickle'), 'rb')
try:
self.globalcontext = pickle.load(f)
finally:
f.close()
f = open(path.join(self.data_root, 'searchindex.pickle'), 'rb')
try:
self.search_frontend = SearchFrontend(pickle.load(f))
finally:
f.close()
self.buildmtime = new_mtime
self.cache.clear()
finally:
env_lock.release()
def search(self, req):
"""
Search the database. Currently just a keyword based search.
"""
if not req.args.get('q'):
return RedirectResponse('')
return RedirectResponse('q/%s/' % req.args['q'])
def get_page_source(self, page):
"""
Get the reST source of a page.
"""
page_id = self.env.get_real_filename(page)[:-4]
if page_id is None:
raise NotFound()
filename = path.join(self.data_root, 'sources', page_id) + '.txt'
f = open(filename)
try:
return page_id, f.read()
finally:
f.close()
def show_source(self, req, page):
"""
Show the highlighted source for a given page.
"""
return Response(self.get_page_source(page)[1], mimetype='text/plain')
def suggest_changes(self, req, page):
"""
Show a "suggest changes" form.
"""
page_id, contents = self.get_page_source(page)
return Response(render_template(req, 'edit.html', self.globalcontext, dict(
contents=contents,
pagename=page,
doctitle=self.globalcontext['titles'].get(page_id+'.rst') or 'this page',
submiturl=relative_uri('/@edit/'+page+'/', '/@submit/'+page),
)))
def _generate_preview(self, page_id, contents):
"""
Generate a preview for suggested changes.
"""
handle, pathname = tempfile.mkstemp()
os.write(handle, contents.encode('utf-8'))
os.close(handle)
warning_stream = StringIO.StringIO()
env2 = copy.deepcopy(self.env)
destination = StringOutput(encoding='utf-8')
builder = MockBuilder()
builder.config = env2.config
writer = HTMLWriter(builder)
doctree = env2.read_doc(page_id, pathname, save_parsed=False)
doctree = env2.get_and_resolve_doctree(page_id+'.rst', builder, doctree)
doctree.settings = OptionParser(defaults=env2.settings,
components=(writer,)).get_default_values()
doctree.reporter = Reporter(page_id+'.rst', 2, 4, stream=warning_stream)
output = writer.write(doctree, destination)
writer.assemble_parts()
return writer.parts['fragment']
def submit_changes(self, req, page):
"""
Submit the suggested changes as a patch.
"""
if req.method != 'POST':
# only available via POST
raise NotFound()
if req.form.get('cancel'):
# handle cancel requests directly
return RedirectResponse(page)
# raises NotFound if page doesn't exist
page_id, orig_contents = self.get_page_source(page)
author = req.form.get('name')
email = req.form.get('email')
summary = req.form.get('summary')
contents = req.form.get('contents')
fields = (author, email, summary, contents)
form_error = None
rendered = None
if not all(fields):
form_error = 'You have to fill out all fields.'
elif not _mail_re.search(email):
form_error = 'You have to provide a valid e-mail address.'
elif req.form.get('homepage') or self.antispam.is_spam(fields):
form_error = 'Your text contains blocked URLs or words.'
else:
if req.form.get('preview'):
rendered = self._generate_preview(page_id, contents)
else:
asctime = time.asctime()
contents = contents.splitlines()
orig_contents = orig_contents.splitlines()
diffname = 'suggestion on %s by %s <%s>' % (asctime, author, email)
diff = difflib.unified_diff(orig_contents, contents, n=3,
fromfile=page_id, tofile=diffname,
lineterm='')
diff_text = '\n'.join(diff)
try:
mail = Email(
self.config['patch_mail_from'], 'Python Documentation Patches',
self.config['patch_mail_to'], '',
'Patch for %s by %s' % (page_id, author),
PATCH_MESSAGE % locals(),
self.config['patch_mail_smtp'],
)
mail.attachments.add_string('patch.diff', diff_text, 'text/x-diff')
mail.send()
except:
import traceback
traceback.print_exc()
# XXX: how to report?
pass
return Response(render_template(req, 'submitted.html',
self.globalcontext, dict(
backlink=relative_uri('/@submit/'+page+'/', page+'/')
)))
return Response(render_template(req, 'edit.html', self.globalcontext, dict(
contents=contents,
author=author,
email=email,
summary=summary,
pagename=page,
form_error=form_error,
rendered=rendered,
submiturl=relative_uri('/@edit/'+page+'/', '/@submit/'+page),
)))
def get_settings_page(self, req):
"""
Handle the settings page.
"""
referer = req.environ.get('HTTP_REFERER') or ''
if referer:
base = get_base_uri(req.environ)
if not referer.startswith(base):
referer = ''
else:
referer = referer[len(base):]
referer = referer.split('?')[0] or referer
if req.method == 'POST':
if req.form.get('cancel'):
if req.form.get('referer'):
return RedirectResponse(req.form['referer'])
return RedirectResponse('')
new_style = req.form.get('design')
if new_style and new_style in known_designs:
req.session['design'] = new_style
new_comments = req.form.get('comments')
if new_comments and new_comments in comments_methods:
req.session['comments'] = new_comments
if req.form.get('goback') and req.form.get('referer'):
return RedirectResponse(req.form['referer'])
# else display the same page again
referer = ''
context = {
'known_designs': sorted(known_designs.iteritems()),
'comments_methods': comments_methods.items(),
'curdesign': req.session.get('design') or 'default',
'curcomments': req.session.get('comments') or 'inline',
'referer': referer,
}
return Response(render_template(req, 'settings.html',
self.globalcontext, context))
@cached
def get_module_index(self, req):
"""
Get the module index or redirect to a module from the module index.
"""
most_frequent = heapq.nlargest(30, self.freqmodules.iteritems(),
lambda x: x[1])
if most_frequent:
base_count = most_frequent[-1][1]
most_frequent = [{
'name': x[0],
'size': 100 + math.log((x[1] - base_count) + 1) * 20,
'count': x[1]
} for x in sorted(most_frequent)]
showpf = None
newpf = req.args.get('newpf')
sesspf = req.session.get('pf')
if newpf or sesspf:
yield NoCache
if newpf:
req.session['pf'] = showpf = req.args.getlist('pf')
else:
showpf = sesspf
else:
if most_frequent != self.last_most_frequent:
self.cache.pop('@modindex', None)
yield '@modindex'
filename = path.join(self.data_root, 'modindex.fpickle')
f = open(filename, 'rb')
try:
context = pickle.load(f)
finally:
f.close()
if showpf:
entries = context['modindexentries']
i = 0
while i < len(entries):
if entries[i][6]:
for pform in entries[i][6]:
if pform in showpf:
break
else:
del entries[i]
continue
i += 1
context['freqentries'] = most_frequent
context['showpf'] = showpf or context['platforms']
self.last_most_frequent = most_frequent
yield render_template(req, 'modindex.html',
self.globalcontext, context)
def show_comment_form(self, req, page):
"""
Show the "new comment" form.
"""
page_id = self.env.get_real_filename(page)[:-4]
ajax_mode = req.args.get('mode') == 'ajax'
target = req.args.get('target')
page_comment_mode = not target
form_error = preview = None
title = req.form.get('title', '').strip()
if 'author' in req.form:
author = req.form['author']
else:
author = req.session.get('author', '')
if 'author_mail' in req.form:
author_mail = req.form['author_mail']
else:
author_mail = req.session.get('author_mail', '')
comment_body = req.form.get('comment_body', '')
fields = (title, author, author_mail, comment_body)
if req.method == 'POST':
if req.form.get('preview'):
preview = Comment(page_id, target, title, author, author_mail,
comment_body)
# 'homepage' is a forbidden field to thwart bots
elif req.form.get('homepage') or self.antispam.is_spam(fields):
form_error = 'Your text contains blocked URLs or words.'
else:
if not all(fields):
form_error = 'You have to fill out all fields.'
elif _mail_re.search(author_mail) is None:
form_error = 'You have to provide a valid e-mail address.'
elif len(comment_body) < 20:
form_error = 'You comment is too short ' \
'(must have at least 20 characters).'
else:
# '|none' can stay since it doesn't include comments
self.cache.pop(page_id + '|inline', None)
self.cache.pop(page_id + '|bottom', None)
comment = Comment(page_id, target,
title, author, author_mail,
comment_body)
comment.save()
req.session['author'] = author
req.session['author_mail'] = author_mail
if ajax_mode:
return JSONResponse({'posted': True, 'error': False,
'commentID': comment.comment_id})
return RedirectResponse(comment.url)
output = render_template(req, '_commentform.html', {
'ajax_mode': ajax_mode,
'preview': preview,
'suggest_url': '@edit/%s/' % page,
'comments_form': {
'target': target,
'title': title,
'author': author,
'author_mail': author_mail,
'comment_body': comment_body,
'error': form_error
}
})
if ajax_mode:
return JSONResponse({
'body': output,
'error': bool(form_error),
'posted': False
})
return Response(render_template(req, 'commentform.html', {
'form': output
}))
def _insert_comments(self, req, url, context, mode):
"""
Insert inline comments into a page context.
"""
if 'body' not in context:
return
comment_url = '@comments/%s/' % url
page_id = self.env.get_real_filename(url)[:-4]
tx = context['body']
all_comments = Comment.get_for_page(page_id)
global_comments = []
for name, comments in groupby(all_comments, lambda x: x.associated_name):
if not name:
global_comments.extend(comments)
continue
comments = list(comments)
if not comments:
continue
tx = re.sub('<!--#%s#-->' % name,
render_template(req, 'inlinecomments.html', {
'comments': comments,
'id': name,
'comment_url': comment_url,
'mode': mode}),
tx)
if mode == 'bottom':
global_comments.extend(comments)
if mode == 'inline':
# replace all markers for items without comments
tx = re.sub('<!--#([^#]*)#-->',
(lambda match:
render_template(req, 'inlinecomments.html', {
'id': match.group(1),
'mode': 'inline',
'comment_url': comment_url
},)),
tx)
tx += render_template(req, 'comments.html', {
'comments': global_comments,
'comment_url': comment_url
})
context['body'] = tx
@cached
def get_page(self, req, url):
"""
Show the requested documentation page or raise an
`NotFound` exception to display a page with close matches.
"""
page_id = self.env.get_real_filename(url)[:-4]
if page_id is None:
raise NotFound(show_keyword_matches=True)
# increment view count of all modules on that page
for modname in self.env.filemodules.get(page_id+'.rst', ()):
self.freqmodules[modname] = self.freqmodules.get(modname, 0) + 1
# comments enabled?
comments = self.env.metadata[page_id+'.rst'].get('nocomments', False)
# how does the user want to view comments?
commentmode = comments and req.session.get('comments', 'inline') or ''
# show "old URL" message? -> no caching possible
oldurl = req.args.get('oldurl')
if oldurl:
yield NoCache
else:
# there must be different cache entries per comment mode
yield page_id + '|' + commentmode
# cache miss; load the page and render it
filename = path.join(self.data_root, page_id + '.fpickle')
f = open(filename, 'rb')
try:
context = pickle.load(f)
finally:
f.close()
# add comments to paqe text
if commentmode != 'none':
self._insert_comments(req, url, context, commentmode)
yield render_template(req, 'page.html', self.globalcontext, context,
{'oldurl': oldurl})
@cached
def get_special_page(self, req, name):
yield '@'+name
filename = path.join(self.data_root, name + '.fpickle')
f = open(filename, 'rb')
try:
context = pickle.load(f)
finally:
f.close()
yield render_template(req, name+'.html',
self.globalcontext, context)
def comments_feed(self, req, url):
if url == 'recent':
feed = Feed(req, 'Recent Comments', 'Recent Comments', '')
for comment in Comment.get_recent():
feed.add_item(comment.title, comment.author, comment.url,
comment.parsed_comment_body, comment.pub_date)
else:
page_id = self.env.get_real_filename(url)[:-4]
doctitle = striptags(self.globalcontext['titles'].get(page_id+'.rst', url))
feed = Feed(req, 'Comments for "%s"' % doctitle,
'List of comments for the topic "%s"' % doctitle, url)
for comment in Comment.get_for_page(page_id):
feed.add_item(comment.title, comment.author, comment.url,
comment.parsed_comment_body, comment.pub_date)
return Response(feed.generate(), mimetype='application/rss+xml')
def get_error_404(self, req):
"""
Show a simple error 404 page.
"""
return Response(render_template(req, 'not_found.html', self.globalcontext),
status=404)
pretty_type = {
'data': 'module data',
'cfunction': 'C function',
'cmember': 'C member',
'cmacro': 'C macro',
'ctype': 'C type',
'cvar': 'C variable',
}
def get_keyword_matches(self, req, term=None, avoid_fuzzy=False,
is_error_page=False):
"""
Find keyword matches. If there is an exact match, just redirect:
http://docs.python.org/os.path.exists would automatically
redirect to http://docs.python.org/library/os.path/#os.path.exists.
Else, show a page with close matches.
Module references are processed first so that "os.path" is handled as
a module and not as member of os.
"""
if term is None:
term = req.path.strip('/')
matches = self.env.find_keyword(term, avoid_fuzzy)
# if avoid_fuzzy is False matches can be None
if matches is None:
return
if isinstance(matches, tuple):
url = get_target_uri(matches[1])
if matches[0] != 'module':
url += '#' + matches[2]
return RedirectResponse(url)
else:
# get some close matches
close_matches = []
good_matches = 0
for ratio, type, filename, anchorname, desc in matches:
link = get_target_uri(filename)
if type != 'module':
link += '#' + anchorname
good_match = ratio > 0.75
good_matches += good_match
close_matches.append({
'href': relative_uri(req.path, link),
'title': anchorname,
'good_match': good_match,
'type': self.pretty_type.get(type, type),
'description': desc,
})
return Response(render_template(req, 'keyword_not_found.html', {
'close_matches': close_matches,
'good_matches_count': good_matches,
'keyword': term
}, self.globalcontext), status=404)
def get_user_stylesheet(self, req):
"""
Stylesheets are exchangeable. Handle them here and
cache them on the server side until server shuts down
and on the client side for 1 hour (not in debug mode).
"""
style = req.session.get('design')
if style not in known_designs:
style = 'default'
if style in self.generated_stylesheets:
stylesheet = self.generated_stylesheets[style]
else:
stylesheet = []
for filename in known_designs[style][0]:
f = open(path.join(self.data_root, 'style', filename))
try:
stylesheet.append(f.read())
finally:
f.close()
stylesheet = '\n'.join(stylesheet)
if not self.config.get('debug'):
self.generated_stylesheets[style] = stylesheet
if req.args.get('admin') == 'yes':
f = open(path.join(self.data_root, 'style', 'admin.css'))
try:
stylesheet += '\n' + f.read()
finally:
f.close()
# XXX: add timestamp based http caching
return Response(stylesheet, mimetype='text/css')
def __call__(self, environ, start_response):
"""
Dispatch requests.
"""
set_connection(self.db_con)
req = Request(environ)
url = req.path.strip('/') or 'index'
# check if the environment was updated
new_mtime = path.getmtime(self.buildfile)
if self.buildmtime != new_mtime:
self.load_env(new_mtime)
try:
if req.path == '/favicon.ico':
# TODO: change this to real favicon?
resp = Response('404 Not Found', status=404)
elif req.path == '/robots.txt':
resp = Response(robots_txt, mimetype='text/plain')
elif not req.path.endswith('/') and req.method == 'GET':
# may be an old URL
if url.endswith('.html'):
resp = handle_html_url(self, url)
else:
# else, require a trailing slash on GET requests
# this ensures nice looking urls and working relative
# links for cached resources.
query = req.environ.get('QUERY_STRING', '')
resp = RedirectResponse(req.path + '/' + (query and '?'+query))
# index page is special
elif url == 'index':
# presets for settings
if req.args.get('design') and req.args['design'] in known_designs:
req.session['design'] = req.args['design']
if req.args.get('comments') and req.args['comments'] in comments_methods:
req.session['comments'] = req.args['comments']
# alias for fuzzy search
if 'q' in req.args:
resp = RedirectResponse('q/%s/' % req.args['q'])
# stylesheet
elif req.args.get('do') == 'stylesheet':
resp = self.get_user_stylesheet(req)
else:
resp = self.get_special_page(req, 'index')
# go to the search page
# XXX: this is currently just a redirect to /q/ which is handled below
elif url == 'search':
resp = self.search(req)
# settings page cannot be cached
elif url == 'settings':
resp = self.get_settings_page(req)
# module index page is special
elif url == 'modindex':
resp = self.get_module_index(req)
# genindex page is special too
elif url == 'genindex':
resp = self.get_special_page(req, 'genindex')
# start the fuzzy search
elif url[:2] == 'q/':
resp = self.get_keyword_matches(req, url[2:])
# special URLs -- don't forget to add them to robots.py
elif url[0] == '@':
# source view
if url[:8] == '@source/':
resp = self.show_source(req, url[8:])
# suggest changes view
elif url[:6] == '@edit/':
resp = self.suggest_changes(req, url[6:])
# suggest changes submit
elif url[:8] == '@submit/':
resp = self.submit_changes(req, url[8:])
# show that comment form
elif url[:10] == '@comments/':
resp = self.show_comment_form(req, url[10:])
# comments RSS feed
elif url[:5] == '@rss/':
resp = self.comments_feed(req, url[5:])
# dispatch requests to the admin panel
elif url == '@admin' or url[:7] == '@admin/':
resp = self.admin_panel.dispatch(req, url[7:])
else:
raise NotFound()
# everything else is handled as page or fuzzy search
# if a page does not exist.
else:
resp = self.get_page(req, url)
# views can raise a NotFound exception to show an error page.
# Either a real not found page or a similar matches page.
except NotFound, e:
if e.show_keyword_matches:
resp = self.get_keyword_matches(req, is_error_page=True)
else:
resp = self.get_error_404(req)
return resp(environ, start_response)
def _check_superuser(app):
"""Check if there is a superuser and create one if necessary."""
if not app.userdb.users:
print 'Warning: you have no user database or no master "admin" account.'
create = raw_input('Do you want to create an admin account now? [y/n] ')
if not create or create.lower().startswith('y'):
import getpass
print 'Creating "admin" user.'
pw1 = getpass.getpass('Enter password: ')
pw2 = getpass.getpass('Enter password again: ')
if pw1 != pw2:
print 'Error: Passwords don\'t match.'
raise SystemExit(1)
app.userdb.set_password('admin', pw1)
app.userdb.privileges['admin'].add('master')
app.userdb.save()
def setup_app(config, check_superuser=False):
"""
Create the WSGI application based on a configuration dict.
Handled configuration values so far:
`data_root_path`
the folder containing the documentation data as generated
by sphinx with the web builder.
"""
app = DocumentationApplication(config)
if check_superuser:
_check_superuser(app)
app = SharedDataMiddleware(app, {
'/static': path.join(config['data_root_path'], 'static')
})
return app

View File

@ -1,194 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.database
~~~~~~~~~~~~~~~~~~~
The database connections are thread local. To set the connection
for a thread use the `set_connection` function provided. The
`connect` method automatically sets up new tables and returns a
usable connection which is also set as the connection for the
thread that called that function.
:copyright: 2007-2008 by Georg Brandl, Armin Ronacher.
:license: BSD.
"""
import time
import sqlite3
from datetime import datetime
from threading import local
from sphinx.web.markup import markup
_thread_local = local()
def connect(path):
"""Connect and create tables if required. Also assigns
the connection for the current thread."""
con = sqlite3.connect(path, detect_types=sqlite3.PARSE_DECLTYPES)
con.isolation_level = None
# create tables that do not exist.
for table in tables:
try:
con.execute('select * from %s limit 1;' % table)
except sqlite3.OperationalError:
con.execute(tables[table])
set_connection(con)
return con
def get_cursor():
"""Return a new cursor."""
return _thread_local.connection.cursor()
def set_connection(con):
"""Call this after thread creation to make this connection
the connection for this thread."""
_thread_local.connection = con
#: tables that we use
tables = {
'comments': '''
create table comments (
comment_id integer primary key,
associated_page varchar(200),
associated_name varchar(200),
title varchar(120),
author varchar(200),
author_mail varchar(250),
comment_body text,
pub_date timestamp
);'''
}
class Comment(object):
"""
Represents one comment.
"""
def __init__(self, associated_page, associated_name, title, author,
author_mail, comment_body, pub_date=None):
self.comment_id = None
self.associated_page = associated_page
self.associated_name = associated_name
self.title = title
if pub_date is None:
pub_date = datetime.utcnow()
self.pub_date = pub_date
self.author = author
self.author_mail = author_mail
self.comment_body = comment_body
@property
def url(self):
return '%s#comment-%s' % (
self.associated_page,
self.comment_id
)
@property
def parsed_comment_body(self):
from sphinx.web.util import get_target_uri
from sphinx.util import relative_uri
uri = get_target_uri(self.associated_page)
def make_rel_link(keyword):
return relative_uri(uri, 'q/%s/' % keyword)
return markup(self.comment_body, make_rel_link)
def save(self):
"""
Save the comment and use the cursor provided.
"""
cur = get_cursor()
args = (self.associated_page, self.associated_name, self.title,
self.author, self.author_mail, self.comment_body, self.pub_date)
if self.comment_id is None:
cur.execute('''insert into comments (associated_page, associated_name,
title,
author, author_mail,
comment_body, pub_date)
values (?, ?, ?, ?, ?, ?, ?)''', args)
self.comment_id = cur.lastrowid
else:
args += (self.comment_id,)
cur.execute('''update comments set associated_page=?,
associated_name=?,
title=?, author=?,
author_mail=?, comment_body=?,
pub_date=? where comment_id = ?''', args)
cur.close()
def delete(self):
cur = get_cursor()
cur.execute('delete from comments where comment_id = ?',
(self.comment_id,))
cur.close()
@staticmethod
def _make_comment(row):
rv = Comment(*row[1:])
rv.comment_id = row[0]
return rv
@staticmethod
def get(comment_id):
cur = get_cursor()
cur.execute('select * from comments where comment_id = ?', (comment_id,))
row = cur.fetchone()
if row is None:
raise ValueError('comment not found')
try:
return Comment._make_comment(row)
finally:
cur.close()
@staticmethod
def get_for_page(associated_page, reverse=False):
cur = get_cursor()
cur.execute('''select * from comments where associated_page = ?
order by associated_name, comment_id %s''' %
(reverse and 'desc' or 'asc'),
(associated_page,))
try:
return [Comment._make_comment(row) for row in cur]
finally:
cur.close()
@staticmethod
def get_recent(n=10):
cur = get_cursor()
cur.execute('select * from comments order by comment_id desc limit ?',
(n,))
try:
return [Comment._make_comment(row) for row in cur]
finally:
cur.close()
@staticmethod
def get_overview(detail_for=None):
cur = get_cursor()
cur.execute('''select distinct associated_page from comments
order by associated_page asc''')
pages = []
for row in cur:
page_id = row[0]
if page_id == detail_for:
pages.append((page_id, Comment.get_for_page(page_id, True)))
else:
pages.append((page_id, []))
cur.close()
return pages
def __repr__(self):
return '<Comment by %r on %r:%r (%s)>' % (
self.author,
self.associated_page,
self.associated_name,
self.comment_id or 'not saved'
)

View File

@ -1,78 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.feed
~~~~~~~~~~~~~~~
Nifty module that generates RSS feeds.
:copyright: 2007-2008 by Armin Ronacher.
:license: BSD.
"""
import time
from datetime import datetime
from xml.dom.minidom import Document
from email.Utils import formatdate
def format_rss_date(date):
"""
Pass it a datetime object to receive the string representation
for RSS date fields.
"""
return formatdate(time.mktime(date.timetuple()) + date.microsecond / 1e6)
class Feed(object):
"""
Abstract feed creation class. To generate feeds use one of
the subclasses `RssFeed` or `AtomFeed`.
"""
def __init__(self, req, title, description, link):
self.req = req
self.title = title
self.description = description
self.link = req.make_external_url(link)
self.items = []
self._last_update = None
def add_item(self, title, author, link, description, pub_date):
if self._last_update is None or pub_date > self._last_update:
self._last_update = pub_date
date = pub_date or datetime.utcnow()
self.items.append({
'title': title,
'author': author,
'link': self.req.make_external_url(link),
'description': description,
'pub_date': date
})
def generate(self):
return self.generate_document().toxml('utf-8')
def generate_document(self):
doc = Document()
Element = doc.createElement
Text = doc.createTextNode
rss = doc.appendChild(Element('rss'))
rss.setAttribute('version', '2.0')
channel = rss.appendChild(Element('channel'))
for key in ('title', 'description', 'link'):
value = getattr(self, key)
channel.appendChild(Element(key)).appendChild(Text(value))
date = format_rss_date(self._last_update or datetime.utcnow())
channel.appendChild(Element('pubDate')).appendChild(Text(date))
for item in self.items:
d = Element('item')
for key in ('title', 'author', 'link', 'description'):
d.appendChild(Element(key)).appendChild(Text(item[key]))
pub_date = format_rss_date(item['pub_date'])
d.appendChild(Element('pubDate')).appendChild(Text(pub_date))
d.appendChild(Element('guid')).appendChild(Text(item['link']))
channel.appendChild(d)
return doc

View File

@ -1,278 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.mail
~~~~~~~~~~~~~~~
A simple module for sending e-mails, based on simplemail.py.
:copyright: 2004-2007 by Gerold Penz.
:copyright: 2007-2008 by Georg Brandl.
:license: BSD.
"""
import os.path
import sys
import time
import smtplib
import mimetypes
from email import Encoders
from email.Header import Header
from email.MIMEText import MIMEText
from email.MIMEMultipart import MIMEMultipart
from email.Utils import formataddr
from email.Utils import formatdate
from email.Message import Message
from email.MIMEAudio import MIMEAudio
from email.MIMEBase import MIMEBase
from email.MIMEImage import MIMEImage
# Exceptions
#----------------------------------------------------------------------
class SimpleMail_Exception(Exception):
def __str__(self):
return self.__doc__
class NoFromAddress_Exception(SimpleMail_Exception):
pass
class NoToAddress_Exception(SimpleMail_Exception):
pass
class NoSubject_Exception(SimpleMail_Exception):
pass
class AttachmentNotFound_Exception(SimpleMail_Exception):
pass
class Attachments(object):
def __init__(self):
self._attachments = []
def add_filename(self, filename = ''):
self._attachments.append(('file', filename))
def add_string(self, filename, text, mimetype):
self._attachments.append(('string', (filename, text, mimetype)))
def count(self):
return len(self._attachments)
def get_list(self):
return self._attachments
class Recipients(object):
def __init__(self):
self._recipients = []
def add(self, address, caption = ''):
self._recipients.append(formataddr((caption, address)))
def count(self):
return len(self._recipients)
def __repr__(self):
return str(self._recipients)
def get_list(self):
return self._recipients
class CCRecipients(Recipients):
pass
class BCCRecipients(Recipients):
pass
class Email(object):
def __init__(
self,
from_address = "",
from_caption = "",
to_address = "",
to_caption = "",
subject = "",
message = "",
smtp_server = "localhost",
smtp_user = "",
smtp_password = "",
user_agent = "",
reply_to_address = "",
reply_to_caption = "",
use_tls = False,
):
"""
Initialize the email object
from_address = the email address of the sender
from_caption = the caption (name) of the sender
to_address = the email address of the recipient
to_caption = the caption (name) of the recipient
subject = the subject of the email message
message = the body text of the email message
smtp_server = the ip-address or the name of the SMTP-server
smtp_user = (optional) Login name for the SMTP-Server
smtp_password = (optional) Password for the SMTP-Server
user_agent = (optional) program identification
reply_to_address = (optional) Reply-to email address
reply_to_caption = (optional) Reply-to caption (name)
use_tls = (optional) True, if the connection should use TLS
to encrypt.
"""
self.from_address = from_address
self.from_caption = from_caption
self.recipients = Recipients()
self.cc_recipients = CCRecipients()
self.bcc_recipients = BCCRecipients()
if to_address:
self.recipients.add(to_address, to_caption)
self.subject = subject
self.message = message
self.smtp_server = smtp_server
self.smtp_user = smtp_user
self.smtp_password = smtp_password
self.attachments = Attachments()
self.content_subtype = "plain"
self.content_charset = "iso-8859-1"
self.header_charset = "us-ascii"
self.statusdict = None
self.user_agent = user_agent
self.reply_to_address = reply_to_address
self.reply_to_caption = reply_to_caption
self.use_tls = use_tls
def send(self):
"""
Send the mail. Returns True if successfully sent to at least one
recipient.
"""
# validation
if len(self.from_address.strip()) == 0:
raise NoFromAddress_Exception
if self.recipients.count() == 0:
if (
(self.cc_recipients.count() == 0) and
(self.bcc_recipients.count() == 0)
):
raise NoToAddress_Exception
if len(self.subject.strip()) == 0:
raise NoSubject_Exception
# assemble
if self.attachments.count() == 0:
msg = MIMEText(
_text = self.message,
_subtype = self.content_subtype,
_charset = self.content_charset
)
else:
msg = MIMEMultipart()
if self.message:
att = MIMEText(
_text = self.message,
_subtype = self.content_subtype,
_charset = self.content_charset
)
msg.attach(att)
# add headers
from_str = formataddr((self.from_caption, self.from_address))
msg["From"] = from_str
if self.reply_to_address:
reply_to_str = formataddr((self.reply_to_caption, self.reply_to_address))
msg["Reply-To"] = reply_to_str
if self.recipients.count() > 0:
msg["To"] = ", ".join(self.recipients.get_list())
if self.cc_recipients.count() > 0:
msg["Cc"] = ", ".join(self.cc_recipients.get_list())
msg["Date"] = formatdate(time.time())
msg["User-Agent"] = self.user_agent
try:
msg["Subject"] = Header(
self.subject, self.header_charset
)
except(UnicodeDecodeError):
msg["Subject"] = Header(
self.subject, self.content_charset
)
msg.preamble = "You will not see this in a MIME-aware mail reader.\n"
msg.epilogue = ""
# assemble multipart
if self.attachments.count() > 0:
for typ, info in self.attachments.get_list():
if typ == 'file':
filename = info
if not os.path.isfile(filename):
raise AttachmentNotFound_Exception, filename
mimetype, encoding = mimetypes.guess_type(filename)
if mimetype is None or encoding is not None:
mimetype = 'application/octet-stream'
if mimetype.startswith('text/'):
fp = file(filename)
else:
fp = file(filename, 'rb')
text = fp.read()
fp.close()
else:
filename, text, mimetype = info
maintype, subtype = mimetype.split('/', 1)
if maintype == 'text':
# Note: we should handle calculating the charset
att = MIMEText(text, _subtype=subtype)
elif maintype == 'image':
att = MIMEImage(text, _subtype=subtype)
elif maintype == 'audio':
att = MIMEAudio(text, _subtype=subtype)
else:
att = MIMEBase(maintype, subtype)
att.set_payload(text)
# Encode the payload using Base64
Encoders.encode_base64(att)
# Set the filename parameter
att.add_header(
'Content-Disposition',
'attachment',
filename = os.path.basename(filename).strip()
)
msg.attach(att)
# connect to server
smtp = smtplib.SMTP()
if self.smtp_server:
smtp.connect(self.smtp_server)
else:
smtp.connect()
# TLS?
if self.use_tls:
smtp.ehlo()
smtp.starttls()
smtp.ehlo()
# authenticate
if self.smtp_user:
smtp.login(user = self.smtp_user, password = self.smtp_password)
# send
self.statusdict = smtp.sendmail(
from_str,
(
self.recipients.get_list() +
self.cc_recipients.get_list() +
self.bcc_recipients.get_list()
),
msg.as_string()
)
smtp.close()
return True

View File

@ -1,239 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.markup
~~~~~~~~~~~~~~~~~
Awfully simple markup used in comments. Syntax:
`this is some <code>`
like <tt> in HTML
``this is like ` just that i can contain backticks``
like <tt> in HTML
*emphasized*
translates to <em class="important">
**strong**
translates to <strong>
!!!very important message!!!
use this to mark important or dangerous things.
Translates to <em class="dangerous">
[[http://www.google.com/]]
Simple link with the link target as caption. If the
URL is relative the provided callback is called to get
the full URL.
[[http://www.google.com/ go to google]]
Link with "go to google" as caption.
<code>preformatted code that could by python code</code>
Python code (most of the time), otherwise preformatted.
<quote>cite someone</quote>
Like <blockquote> in HTML.
:copyright: 2007-2008 by Armin Ronacher.
:license: BSD.
"""
import cgi
import re
from urlparse import urlparse
from sphinx.highlighting import highlight_block
inline_formatting = {
'escaped_code': ('``', '``'),
'code': ('`', '`'),
'strong': ('**', '**'),
'emphasized': ('*', '*'),
'important': ('!!!', '!!!'),
'link': ('[[', ']]'),
'quote': ('<quote>', '</quote>'),
'code_block': ('<code>', '</code>'),
'paragraph': (r'\n{2,}', None),
'newline': (r'\\$', None)
}
simple_formattings = {
'strong_begin': '<strong>',
'strong_end': '</strong>',
'emphasized_begin': '<em>',
'emphasized_end': '</em>',
'important_begin': '<em class="important">',
'important_end': '</em>',
'quote_begin': '<blockquote>',
'quote_end': '</blockquote>'
}
raw_formatting = set(['link', 'code', 'escaped_code', 'code_block'])
formatting_start_re = re.compile('|'.join(
'(?P<%s>%s)' % (name, end is not None and re.escape(start) or start)
for name, (start, end)
in sorted(inline_formatting.items(), key=lambda x: -len(x[1][0]))
), re.S | re.M)
formatting_end_res = dict(
(name, re.compile(re.escape(end))) for name, (start, end)
in inline_formatting.iteritems() if end is not None
)
without_end_tag = set(name for name, (_, end) in inline_formatting.iteritems()
if end is None)
class StreamProcessor(object):
def __init__(self, stream):
self._pushed = []
self._stream = stream
def __iter__(self):
return self
def next(self):
if self._pushed:
return self._pushed.pop()
return self._stream.next()
def push(self, token, data):
self._pushed.append((token, data))
def get_data(self, drop_needle=False):
result = []
try:
while True:
token, data = self.next()
if token != 'text':
if not drop_needle:
self.push(token, data)
break
result.append(data)
except StopIteration:
pass
return ''.join(result)
class MarkupParser(object):
def __init__(self, make_rel_url):
self.make_rel_url = make_rel_url
def tokenize(self, text):
text = '\n'.join(text.splitlines())
last_pos = 0
pos = 0
end = len(text)
stack = []
text_buffer = []
while pos < end:
if stack:
m = formatting_end_res[stack[-1]].match(text, pos)
if m is not None:
if text_buffer:
yield 'text', ''.join(text_buffer)
del text_buffer[:]
yield stack[-1] + '_end', None
stack.pop()
pos = m.end()
continue
m = formatting_start_re.match(text, pos)
if m is not None:
if text_buffer:
yield 'text', ''.join(text_buffer)
del text_buffer[:]
for key, value in m.groupdict().iteritems():
if value is not None:
if key in without_end_tag:
yield key, None
else:
if key in raw_formatting:
regex = formatting_end_res[key]
m2 = regex.search(text, m.end())
if m2 is None:
yield key, text[m.end():]
else:
yield key, text[m.end():m2.start()]
m = m2
else:
yield key + '_begin', None
stack.append(key)
break
if m is None:
break
else:
pos = m.end()
continue
text_buffer.append(text[pos])
pos += 1
yield 'text', ''.join(text_buffer)
for token in reversed(stack):
yield token + '_end', None
def stream_to_html(self, text):
stream = StreamProcessor(self.tokenize(text))
paragraph = []
result = []
def new_paragraph():
result.append(paragraph[:])
del paragraph[:]
for token, data in stream:
if token in simple_formattings:
paragraph.append(simple_formattings[token])
elif token in ('text', 'escaped_code', 'code'):
if data:
data = cgi.escape(data)
if token in ('escaped_code', 'code'):
data = '<tt>%s</tt>' % data
paragraph.append(data)
elif token == 'link':
if ' ' in data:
href, caption = data.split(' ', 1)
else:
href = caption = data
protocol = urlparse(href)[0]
nofollow = True
if not protocol:
href = self.make_rel_url(href)
nofollow = False
elif protocol == 'javascript':
href = href[11:]
paragraph.append('<a href="%s"%s>%s</a>' % (cgi.escape(href),
nofollow and ' rel="nofollow"' or '',
cgi.escape(caption)))
elif token == 'code_block':
result.append(highlight_block(data, 'python'))
new_paragraph()
elif token == 'paragraph':
new_paragraph()
elif token == 'newline':
paragraph.append('<br>')
if paragraph:
result.append(paragraph)
for item in result:
if isinstance(item, list):
if item:
yield '<p>%s</p>' % ''.join(item)
else:
yield item
def to_html(self, text):
return ''.join(self.stream_to_html(text))
def markup(text, make_rel_url=lambda x: './' + x):
return MarkupParser(make_rel_url).to_html(text)

View File

@ -1,93 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.oldurls
~~~~~~~~~~~~~~~~~~
Handle old URLs gracefully.
:copyright: 2007-2008 by Georg Brandl.
:license: BSD.
"""
import re
from sphinx.web.wsgiutil import RedirectResponse, NotFound
_module_re = re.compile(r'module-(.*)\.html')
_modobj_re = re.compile(r'(.*)-objects\.html')
_modsub_re = re.compile(r'(.*?)-(.*)\.html')
special_module_names = {
'main': '__main__',
'builtin': '__builtin__',
'future': '__future__',
'pycompile': 'py_compile',
}
tutorial_nodes = [
'', '', '',
'appetite',
'interpreter',
'introduction',
'controlflow',
'datastructures',
'modules',
'inputoutput',
'errors',
'classes',
'stdlib',
'stdlib2',
'whatnow',
'interactive',
'floatingpoint',
'',
'glossary',
]
def handle_html_url(req, url):
def inner():
# global special pages
if url.endswith('/contents.html'):
return 'contents/'
if url.endswith('/genindex.html'):
return 'genindex/'
if url.endswith('/about.html'):
return 'about/'
if url.endswith('/reporting-bugs.html'):
return 'bugs/'
if url == 'modindex.html' or url.endswith('/modindex.html'):
return 'modindex/'
if url == 'mac/using.html':
return 'howto/pythonmac/'
# library
if url[:4] in ('lib/', 'mac/'):
p = 'library/'
m = _module_re.match(url[4:])
if m:
mn = m.group(1)
return p + special_module_names.get(mn, mn)
# module sub-pages
m = _modsub_re.match(url[4:])
if m and not _modobj_re.match(url[4:]):
mn = m.group(1)
return p + special_module_names.get(mn, mn)
# XXX: handle all others
# tutorial
elif url[:4] == 'tut/':
try:
node = int(url[8:].split('.html')[0])
except ValueError:
pass
else:
if tutorial_nodes[node]:
return 'tutorial/' + tutorial_nodes[node]
# installing: all in one (ATM)
elif url[:5] == 'inst/':
return 'install/'
# no mapping for "documenting Python..."
# nothing found
raise NotFound()
return RedirectResponse('%s?oldurl=1' % inner())

View File

@ -1,28 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.robots
~~~~~~~~~~~~~~~~~
robots.txt
:copyright: 2007-2008 by Georg Brandl.
:license: BSD.
"""
robots_txt = """\
User-agent: *
Disallow: /@source/
Disallow: /@edit/
Disallow: /@submit/
Disallow: /@comments/
Disallow: /@rss/
Disallow: /@admin
User-agent: Googlebot
Disallow: /@source/
Disallow: /@edit/
Disallow: /@submit/
Disallow: /@comments/
Disallow: /@rss/
Disallow: /@admin
"""

View File

@ -1,99 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.serve
~~~~~~~~~~~~~~~~
This module optionally wraps the `wsgiref` module so that it reloads code
automatically. Works with any WSGI application but it won't help in non
`wsgiref` environments. Use it only for development.
:copyright: 2007-2008 by Armin Ronacher, Georg Brandl.
:license: BSD.
"""
import os
import sys
import time
import thread
def reloader_loop(extra_files):
"""When this function is run from the main thread, it will force other
threads to exit when any modules currently loaded change.
:param extra_files: a list of additional files it should watch.
"""
mtimes = {}
while True:
for filename in filter(None, [getattr(module, '__file__', None)
for module in sys.modules.values()] +
extra_files):
while not os.path.isfile(filename):
filename = os.path.dirname(filename)
if not filename:
break
if not filename:
continue
if filename[-4:] in ('.pyc', '.pyo'):
filename = filename[:-1]
mtime = os.stat(filename).st_mtime
if filename not in mtimes:
mtimes[filename] = mtime
continue
if mtime > mtimes[filename]:
sys.exit(3)
time.sleep(1)
def restart_with_reloader():
"""Spawn a new Python interpreter with the same arguments as this one,
but running the reloader thread."""
while True:
print '* Restarting with reloader...'
args = [sys.executable] + sys.argv
if sys.platform == 'win32':
args = ['"%s"' % arg for arg in args]
new_environ = os.environ.copy()
new_environ['RUN_MAIN'] = 'true'
exit_code = os.spawnve(os.P_WAIT, sys.executable, args, new_environ)
if exit_code != 3:
return exit_code
def run_with_reloader(main_func, extra_watch):
"""
Run the given function in an independent Python interpreter.
"""
if os.environ.get('RUN_MAIN') == 'true':
thread.start_new_thread(main_func, ())
try:
reloader_loop(extra_watch)
except KeyboardInterrupt:
return
try:
sys.exit(restart_with_reloader())
except KeyboardInterrupt:
pass
def run_simple(hostname, port, make_app, use_reloader=False,
extra_files=None):
"""
Start an application using wsgiref and with an optional reloader.
"""
from wsgiref.simple_server import make_server
def inner():
application = make_app()
print '* Startup complete.'
srv = make_server(hostname, port, application)
try:
srv.serve_forever()
except KeyboardInterrupt:
pass
if os.environ.get('RUN_MAIN') != 'true':
print '* Running on http://%s:%d/' % (hostname, port)
if use_reloader:
run_with_reloader(inner, extra_files or [])
else:
inner()

View File

@ -1,162 +0,0 @@
/**
* Sphinx Admin Panel
*/
div.admin {
margin: 0 -20px -30px -20px;
padding: 0 20px 10px 20px;
background-color: #f2f2f2;
color: black;
}
div.admin a {
color: #333;
text-decoration: underline;
}
div.admin a:hover {
color: black;
}
div.admin h1,
div.admin h2 {
background-color: #555;
border-bottom: 1px solid #222;
color: white;
}
div.admin form form {
display: inline;
}
div.admin input, div.admin textarea {
font-family: 'Bitstream Vera Sans', 'Arial', sans-serif;
font-size: 13px;
color: #333;
padding: 2px;
background-color: #fff;
border: 1px solid #aaa;
}
div.admin input[type="reset"],
div.admin input[type="submit"] {
cursor: pointer;
font-weight: bold;
padding: 2px;
}
div.admin input[type="reset"]:hover,
div.admin input[type="submit"]:hover {
border: 1px solid #333;
}
div.admin div.actions {
margin: 10px 0 0 0;
padding: 5px;
background-color: #aaa;
border: 1px solid #777;
}
div.admin div.error {
margin: 10px 0 0 0;
padding: 5px;
border: 2px solid #222;
background-color: #ccc;
font-weight: bold;
}
div.admin div.dialog {
background-color: #ccc;
margin: 10px 0 10px 0;
}
div.admin div.dialog h2 {
margin: 0;
font-size: 18px;
padding: 4px 10px 4px 10px;
}
div.admin div.dialog div.text {
padding: 10px;
}
div.admin div.dialog div.buttons {
padding: 5px 10px 5px 10px;
}
div.admin table.mapping {
width: 100%;
border: 1px solid #999;
border-collapse: collapse;
background-color: #aaa;
}
div.admin table.mapping th {
background-color: #ddd;
border-bottom: 1px solid #888;
padding: 5px;
}
div.admin table.mapping th.recent_comments {
background-color: #c5cba4;
}
div.admin table.mapping,
div.admin table.mapping a {
color: black;
}
div.admin table.mapping td {
border: 1px solid #888;
border-left: none;
border-right: none;
text-align: left;
line-height: 24px;
padding: 0 5px 0 5px;
}
div.admin table.mapping tr:hover {
background-color: #888;
}
div.admin table.mapping td.username {
width: 180px;
}
div.admin table.mapping td.pub_date {
font-style: italic;
text-align: right;
}
div.admin table.mapping td.groups input {
width: 100%;
}
div.admin table.mapping td.actions input {
padding: 0;
}
div.admin table.mapping .actions {
text-align: right;
width: 70px;
}
div.admin table.mapping span.meta {
font-size: 11px;
color: #222;
}
div.admin table.mapping span.meta a {
color: #222;
}
div.admin div.detail_form dt {
clear: both;
float: left;
width: 110px;
}
div.admin div.detail_form textarea {
width: 98%;
height: 160px;
}

Binary file not shown.

Before

Width:  |  Height:  |  Size: 401 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 522 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 415 B

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

View File

@ -1,94 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.userdb
~~~~~~~~~~~~~~~~~
A module that provides pythonic access to the `docusers` file
that stores users and their passwords so that they can gain access
to the administration system.
:copyright: 2007-2008 by Armin Ronacher.
:license: BSD.
"""
from os import path
from hashlib import sha1
from random import choice, randrange
def gen_password(length=8, add_numbers=True, mix_case=True,
add_special_char=True):
"""
Generate a pronounceable password.
"""
if length <= 0:
raise ValueError('requested password of length <= 0')
consonants = 'bcdfghjklmnprstvwz'
vowels = 'aeiou'
if mix_case:
consonants = consonants * 2 + consonants.upper()
vowels = vowels * 2 + vowels.upper()
pw = ''.join([choice(consonants) +
choice(vowels) +
choice(consonants + vowels) for _
in xrange(length // 3 + 1)])[:length]
if add_numbers:
n = length // 3
if n > 0:
pw = pw[:-n]
for _ in xrange(n):
pw += choice('0123456789')
if add_special_char:
tmp = randrange(0, len(pw))
l1 = pw[:tmp]
l2 = pw[tmp:]
if max(len(l1), len(l2)) == len(l1):
l1 = l1[:-1]
else:
l2 = l2[:-1]
return l1 + choice('#$&%?!') + l2
return pw
class UserDatabase(object):
def __init__(self, filename):
self.filename = filename
self.users = {}
self.privileges = {}
if path.exists(filename):
f = open(filename)
try:
for line in f:
line = line.strip()
if line and line[0] != '#':
parts = line.split(':')
self.users[parts[0]] = parts[1]
self.privileges.setdefault(parts[0], set()).update(
x for x in parts[2].split(',') if x)
finally:
f.close()
def set_password(self, user, password):
"""Encode the password for a user (also adds users)."""
self.users[user] = sha1('%s|%s' % (user, password)).hexdigest()
def add_user(self, user):
"""Add a new user and return the generated password."""
pw = gen_password(8, add_special_char=False)
self.set_password(user, pw)
self.privileges[user].clear()
return pw
def check_password(self, user, password):
return user in self.users and \
self.users[user] == sha1('%s|%s' % (user, password)).hexdigest()
def save(self):
f = open(self.filename, 'w')
try:
for username, password in self.users.iteritems():
privileges = ','.join(self.privileges.get(username, ()))
f.write('%s:%s:%s\n' % (username, password, privileges))
finally:
f.close()

View File

@ -1,94 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.util
~~~~~~~~~~~~~~~
Miscellaneous utilities.
:copyright: 2007-2008 by Georg Brandl.
:license: BSD.
"""
import re
from os import path
from sphinx.util import relative_uri
from sphinx._jinja import Environment, FileSystemLoader
def get_target_uri(source_filename):
"""Get the web-URI for a given reST file name (without extension)."""
if source_filename == 'index':
return ''
if source_filename.endswith('/index'):
return source_filename[:-5] # up to /
return source_filename + '/'
# ------------------------------------------------------------------------------
# Setup the templating environment
templates_path = path.join(path.dirname(__file__), '..', 'templates')
jinja_env = Environment(loader=FileSystemLoader(templates_path,
use_memcache=True),
friendly_traceback=True)
def do_datetime_format():
def wrapped(env, ctx, value):
return value.strftime('%a, %d %b %Y %H:%M')
return wrapped
jinja_env.filters['datetimeformat'] = do_datetime_format
_striptags_re = re.compile(r'(<!--.*?-->|<[^>]+>)')
def striptags(text):
return ' '.join(_striptags_re.sub('', text).split())
def render_template(req, template_name, *contexts):
context = {}
for ctx in contexts:
context.update(ctx)
tmpl = jinja_env.get_template(template_name)
path = req.path.lstrip('/')
if not path[-1:] == '/':
path += '/'
def relative_path_to(otheruri, resource=False):
if not resource:
otheruri = get_target_uri(otheruri)
return relative_uri(path, otheruri)
context['pathto'] = relative_path_to
# add it here a second time for templates that don't
# get the builder information from the environment (such as search)
context['builder'] = 'web'
context['req'] = req
return tmpl.render(context)
class lazy_property(object):
"""
Descriptor implementing a "lazy property", i.e. the function
calculating the property value is called only once.
"""
def __init__(self, func, name=None, doc=None):
self._func = func
self._name = name or func.func_name
self.__doc__ = doc or func.__doc__
def __get__(self, obj, objtype=None):
if obj is None:
return self
value = self._func(obj)
setattr(obj, self._name, value)
return value
class blackhole_dict(dict):
def __setitem__(self, key, value):
pass

View File

@ -1,13 +0,0 @@
# -*- coding: utf-8 -*-
#
# Sphinx documentation web application configuration file
#
# Where the server listens.
listen_addr = 'localhost'
listen_port = 3000
# How patch mails are sent.
patch_mail_from = 'patches@localhost'
patch_mail_to = 'docs@localhost'
patch_mail_smtp = 'localhost'

View File

@ -1,705 +0,0 @@
# -*- coding: utf-8 -*-
"""
sphinx.web.wsgiutil
~~~~~~~~~~~~~~~~~~~
To avoid further dependencies this module collects some of the
classes werkzeug provides and use in other views.
:copyright: 2007-2008 by Armin Ronacher.
:license: BSD.
"""
import cgi
import urllib
import cPickle as pickle
import tempfile
from os import path
from time import gmtime, time, asctime
from random import random
from Cookie import SimpleCookie
from hashlib import sha1
from datetime import datetime
from cStringIO import StringIO
from sphinx.web.util import lazy_property
from sphinx.util.json import dump_json
HTTP_STATUS_CODES = {
100: 'CONTINUE',
101: 'SWITCHING PROTOCOLS',
102: 'PROCESSING',
200: 'OK',
201: 'CREATED',
202: 'ACCEPTED',
203: 'NON-AUTHORITATIVE INFORMATION',
204: 'NO CONTENT',
205: 'RESET CONTENT',
206: 'PARTIAL CONTENT',
207: 'MULTI STATUS',
300: 'MULTIPLE CHOICES',
301: 'MOVED PERMANENTLY',
302: 'FOUND',
303: 'SEE OTHER',
304: 'NOT MODIFIED',
305: 'USE PROXY',
306: 'RESERVED',
307: 'TEMPORARY REDIRECT',
400: 'BAD REQUEST',
401: 'UNAUTHORIZED',
402: 'PAYMENT REQUIRED',
403: 'FORBIDDEN',
404: 'NOT FOUND',
405: 'METHOD NOT ALLOWED',
406: 'NOT ACCEPTABLE',
407: 'PROXY AUTHENTICATION REQUIRED',
408: 'REQUEST TIMEOUT',
409: 'CONFLICT',
410: 'GONE',
411: 'LENGTH REQUIRED',
412: 'PRECONDITION FAILED',
413: 'REQUEST ENTITY TOO LARGE',
414: 'REQUEST-URI TOO LONG',
415: 'UNSUPPORTED MEDIA TYPE',
416: 'REQUESTED RANGE NOT SATISFIABLE',
417: 'EXPECTATION FAILED',
500: 'INTERNAL SERVER ERROR',
501: 'NOT IMPLEMENTED',
502: 'BAD GATEWAY',
503: 'SERVICE UNAVAILABLE',
504: 'GATEWAY TIMEOUT',
505: 'HTTP VERSION NOT SUPPORTED',
506: 'VARIANT ALSO VARIES',
507: 'INSUFFICIENT STORAGE',
510: 'NOT EXTENDED'
}
SID_COOKIE_NAME = 'python_doc_sid'
# ------------------------------------------------------------------------------
# Support for HTTP parameter parsing, requests and responses
class _StorageHelper(cgi.FieldStorage):
"""
Helper class used by `Request` to parse submitted file and
form data. Don't use this class directly.
"""
FieldStorageClass = cgi.FieldStorage
def __init__(self, environ, get_stream):
cgi.FieldStorage.__init__(self,
fp=environ['wsgi.input'],
environ={
'REQUEST_METHOD': environ['REQUEST_METHOD'],
'CONTENT_TYPE': environ['CONTENT_TYPE'],
'CONTENT_LENGTH': environ['CONTENT_LENGTH']
},
keep_blank_values=True
)
self.get_stream = get_stream
def make_file(self, binary=None):
return self.get_stream()
class MultiDict(dict):
"""
A dict that takes a list of multiple values as only argument
in order to store multiple values per key.
"""
def __init__(self, mapping=()):
if isinstance(mapping, MultiDict):
dict.__init__(self, mapping.lists())
elif isinstance(mapping, dict):
tmp = {}
for key, value in mapping:
tmp[key] = [value]
dict.__init__(self, tmp)
else:
tmp = {}
for key, value in mapping:
tmp.setdefault(key, []).append(value)
dict.__init__(self, tmp)
def __getitem__(self, key):
"""
Return the first data value for this key;
raises KeyError if not found.
"""
return dict.__getitem__(self, key)[0]
def __setitem__(self, key, value):
"""Set an item as list."""
dict.__setitem__(self, key, [value])
def get(self, key, default=None):
"""Return the default value if the requested data doesn't exist"""
try:
return self[key]
except KeyError:
return default
def getlist(self, key):
"""Return an empty list if the requested data doesn't exist"""
try:
return dict.__getitem__(self, key)
except KeyError:
return []
def setlist(self, key, new_list):
"""Set new values for an key."""
dict.__setitem__(self, key, list(new_list))
def setdefault(self, key, default=None):
if key not in self:
self[key] = default
else:
default = self[key]
return default
def setlistdefault(self, key, default_list=()):
if key not in self:
default_list = list(default_list)
dict.__setitem__(self, key, default_list)
else:
default_list = self.getlist(key)
return default_list
def items(self):
"""
Return a list of (key, value) pairs, where value is the last item in
the list associated with the key.
"""
return [(key, self[key]) for key in self.iterkeys()]
lists = dict.items
def values(self):
"""Returns a list of the last value on every key list."""
return [self[key] for key in self.iterkeys()]
listvalues = dict.values
def iteritems(self):
for key, values in dict.iteritems(self):
yield key, values[0]
iterlists = dict.iteritems
def itervalues(self):
for values in dict.itervalues(self):
yield values[0]
iterlistvalues = dict.itervalues
def copy(self):
"""Return a shallow copy of this object."""
return self.__class__(self)
def update(self, other_dict):
"""update() extends rather than replaces existing key lists."""
if isinstance(other_dict, MultiDict):
for key, value_list in other_dict.iterlists():
self.setlistdefault(key, []).extend(value_list)
elif isinstance(other_dict, dict):
for key, value in other_dict.items():
self.setlistdefault(key, []).append(value)
else:
for key, value in other_dict:
self.setlistdefault(key, []).append(value)
def pop(self, *args):
"""Pop the first item for a list on the dict."""
return dict.pop(self, *args)[0]
def popitem(self):
"""Pop an item from the dict."""
item = dict.popitem(self)
return (item[0], item[1][0])
poplist = dict.pop
popitemlist = dict.popitem
def __repr__(self):
tmp = []
for key, values in self.iterlists():
for value in values:
tmp.append((key, value))
return '%s(%r)' % (self.__class__.__name__, tmp)
class Headers(object):
"""
An object that stores some headers.
"""
def __init__(self, defaults=None):
self._list = []
if isinstance(defaults, dict):
for key, value in defaults.iteritems():
if isinstance(value, (tuple, list)):
for v in value:
self._list.append((key, v))
else:
self._list.append((key, value))
elif defaults is not None:
for key, value in defaults:
self._list.append((key, value))
def __getitem__(self, key):
ikey = key.lower()
for k, v in self._list:
if k.lower() == ikey:
return v
raise KeyError(key)
def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
def getlist(self, key):
ikey = key.lower()
result = []
for k, v in self._list:
if k.lower() == ikey:
result.append((k, v))
return result
def setlist(self, key, values):
del self[key]
self.addlist(key, values)
def addlist(self, key, values):
self._list.extend(values)
def lists(self, lowercased=False):
if not lowercased:
return self._list[:]
return [(x.lower(), y) for x, y in self._list]
def iterlists(self, lowercased=False):
for key, value in self._list:
if lowercased:
key = key.lower()
yield key, value
def iterkeys(self):
for key, _ in self.iterlists():
yield key
def itervalues(self):
for _, value in self.iterlists():
yield value
def keys(self):
return list(self.iterkeys())
def values(self):
return list(self.itervalues())
def __delitem__(self, key):
key = key.lower()
new = []
for k, v in self._list:
if k != key:
new.append((k, v))
self._list[:] = new
remove = __delitem__
def __contains__(self, key):
key = key.lower()
for k, v in self._list:
if k.lower() == key:
return True
return False
has_key = __contains__
def __iter__(self):
return iter(self._list)
def add(self, key, value):
"""add a new header tuple to the list"""
self._list.append((key, value))
def clear(self):
"""clears all headers"""
del self._list[:]
def set(self, key, value):
"""remove all header tuples for key and add
a new one
"""
del self[key]
self.add(key, value)
__setitem__ = set
def to_list(self, charset):
"""Create a str only list of the headers."""
result = []
for k, v in self:
if isinstance(v, unicode):
v = v.encode(charset)
else:
v = str(v)
result.append((k, v))
return result
def copy(self):
return self.__class__(self._list)
def __repr__(self):
return '%s(%r)' % (
self.__class__.__name__,
self._list
)
class Session(dict):
def __init__(self, sid):
self.sid = sid
if sid is not None:
if path.exists(self.filename):
f = open(self.filename, 'rb')
try:
self.update(pickle.load(f))
finally:
f.close()
self._orig = dict(self)
@property
def filename(self):
if self.sid is not None:
return path.join(tempfile.gettempdir(), '__pydoc_sess' + self.sid)
@property
def worth_saving(self):
return self != self._orig
def save(self):
if self.sid is None:
self.sid = sha1('%s|%s' % (time(), random())).hexdigest()
f = open(self.filename, 'wb')
try:
pickle.dump(dict(self), f, pickle.HIGHEST_PROTOCOL)
finally:
f.close()
self._orig = dict(self)
class Request(object):
charset = 'utf-8'
def __init__(self, environ):
self.environ = environ
self.environ['werkzeug.request'] = self
self.session = Session(self.cookies.get(SID_COOKIE_NAME))
self.user = self.session.get('user')
def login(self, user):
self.user = self.session['user'] = user
def logout(self):
self.user = None
self.session.pop('user', None)
def _get_file_stream(self):
"""Called to get a stream for the file upload.
This must provide a file-like class with `read()`, `readline()`
and `seek()` methods that is both writeable and readable."""
return tempfile.TemporaryFile('w+b')
def _load_post_data(self):
"""Method used internally to retrieve submitted data."""
self._data = ''
post = []
files = []
if self.environ['REQUEST_METHOD'] in ('POST', 'PUT'):
storage = _StorageHelper(self.environ, self._get_file_stream)
for key in storage.keys():
values = storage[key]
if not isinstance(values, list):
values = [values]
for item in values:
if getattr(item, 'filename', None) is not None:
fn = item.filename.decode(self.charset, 'ignore')
# fix stupid IE bug
if len(fn) > 1 and fn[1] == ':' and '\\' in fn:
fn = fn[fn.index('\\') + 1:]
files.append((key, FileStorage(key, fn, item.type,
item.length, item.file)))
else:
post.append((key, item.value.decode(self.charset,
'ignore')))
self._form = MultiDict(post)
self._files = MultiDict(files)
def read(self, *args):
if not hasattr(self, '_buffered_stream'):
self._buffered_stream = StringIO(self.data)
return self._buffered_stream.read(*args)
def readline(self, *args):
if not hasattr(self, '_buffered_stream'):
self._buffered_stream = StringIO(self.data)
return self._buffered_stream.readline(*args)
def make_external_url(self, path):
url = self.environ['wsgi.url_scheme'] + '://'
if 'HTTP_HOST' in self.environ:
url += self.environ['HTTP_HOST']
else:
url += self.environ['SERVER_NAME']
if (self.environ['wsgi.url_scheme'], self.environ['SERVER_PORT']) not \
in (('https', '443'), ('http', '80')):
url += ':' + self.environ['SERVER_PORT']
url += urllib.quote(self.environ.get('SCRIPT_INFO', '').rstrip('/'))
if not path.startswith('/'):
path = '/' + path
return url + path
def args(self):
"""URL parameters"""
items = []
qs = self.environ.get('QUERY_STRING', '')
for key, values in cgi.parse_qs(qs, True).iteritems():
for value in values:
value = value.decode(self.charset, 'ignore')
items.append((key, value))
return MultiDict(items)
args = lazy_property(args)
def data(self):
"""raw value of input stream."""
if not hasattr(self, '_data'):
self._load_post_data()
return self._data
data = lazy_property(data)
def form(self):
"""form parameters."""
if not hasattr(self, '_form'):
self._load_post_data()
return self._form
form = lazy_property(form)
def files(self):
"""File uploads."""
if not hasattr(self, '_files'):
self._load_post_data()
return self._files
files = lazy_property(files)
def cookies(self):
"""Stored Cookies."""
cookie = SimpleCookie()
cookie.load(self.environ.get('HTTP_COOKIE', ''))
result = {}
for key, value in cookie.iteritems():
result[key] = value.value.decode(self.charset, 'ignore')
return result
cookies = lazy_property(cookies)
def method(self):
"""Request method."""
return self.environ['REQUEST_METHOD']
method = property(method, doc=method.__doc__)
def path(self):
"""Requested path."""
path = '/' + (self.environ.get('PATH_INFO') or '').lstrip('/')
path = path.decode(self.charset, self.charset)
parts = path.replace('+', ' ').split('/')
return u'/'.join(p for p in parts if p != '..')
path = lazy_property(path)
class Response(object):
charset = 'utf-8'
default_mimetype = 'text/html'
def __init__(self, response=None, headers=None, status=200, mimetype=None):
if response is None:
self.response = []
elif isinstance(response, basestring):
self.response = [response]
else:
self.response = iter(response)
if not headers:
self.headers = Headers()
elif isinstance(headers, Headers):
self.headers = headers
else:
self.headers = Headers(headers)
if mimetype is None and 'Content-Type' not in self.headers:
mimetype = self.default_mimetype
if mimetype is not None:
if 'charset=' not in mimetype and mimetype.startswith('text/'):
mimetype += '; charset=' + self.charset
self.headers['Content-Type'] = mimetype
self.status = status
self._cookies = None
def write(self, value):
if not isinstance(self.response, list):
raise RuntimeError('cannot write to streaming response')
self.write = self.response.append
self.response.append(value)
def set_cookie(self, key, value='', max_age=None, expires=None,
path='/', domain=None, secure=None):
if self._cookies is None:
self._cookies = SimpleCookie()
if isinstance(value, unicode):
value = value.encode(self.charset)
self._cookies[key] = value
if max_age is not None:
self._cookies[key]['max-age'] = max_age
if expires is not None:
if isinstance(expires, basestring):
self._cookies[key]['expires'] = expires
expires = None
elif isinstance(expires, datetime):
expires = expires.utctimetuple()
elif not isinstance(expires, (int, long)):
expires = gmtime(expires)
else:
raise ValueError('datetime or integer required')
month = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul',
'Aug', 'Sep', 'Oct', 'Nov', 'Dec'][expires.tm_mon - 1]
day = ['Monday', 'Tuesday', 'Wednesday', 'Thursday',
'Friday', 'Saturday', 'Sunday'][expires.tm_wday]
date = '%02d-%s-%s' % (
expires.tm_mday, month, str(expires.tm_year)[-2:]
)
d = '%s, %s %02d:%02d:%02d GMT' % (day, date, expires.tm_hour,
expires.tm_min, expires.tm_sec)
self._cookies[key]['expires'] = d
if path is not None:
self._cookies[key]['path'] = path
if domain is not None:
self._cookies[key]['domain'] = domain
if secure is not None:
self._cookies[key]['secure'] = secure
def delete_cookie(self, key):
if self._cookies is None:
self._cookies = SimpleCookie()
if key not in self._cookies:
self._cookies[key] = ''
self._cookies[key]['max-age'] = 0
def __call__(self, environ, start_response):
req = environ['werkzeug.request']
if req.session.worth_saving:
req.session.save()
self.set_cookie(SID_COOKIE_NAME, req.session.sid)
headers = self.headers.to_list(self.charset)
if self._cookies is not None:
for morsel in self._cookies.values():
headers.append(('Set-Cookie', morsel.output(header='')))
status = '%d %s' % (self.status, HTTP_STATUS_CODES[self.status])
charset = self.charset or 'ascii'
start_response(status, headers)
for item in self.response:
if isinstance(item, unicode):
yield item.encode(charset)
else:
yield str(item)
def get_base_uri(environ):
url = environ['wsgi.url_scheme'] + '://'
if 'HTTP_HOST' in environ:
url += environ['HTTP_HOST']
else:
url += environ['SERVER_NAME']
if (environ['wsgi.url_scheme'], environ['SERVER_PORT']) not \
in (('https', '443'), ('http', '80')):
url += ':' + environ['SERVER_PORT']
url += urllib.quote(environ.get('SCRIPT_INFO', '').rstrip('/'))
return url
class RedirectResponse(Response):
def __init__(self, target_url, code=302):
if not target_url.startswith('/'):
target_url = '/' + target_url
self.target_url = target_url
super(RedirectResponse, self).__init__('Moved...', status=code)
def __call__(self, environ, start_response):
url = get_base_uri(environ) + self.target_url
self.headers['Location'] = url
return super(RedirectResponse, self).__call__(environ, start_response)
class JSONResponse(Response):
def __init__(self, data):
assert not isinstance(data, list), 'list unsafe for json dumping'
super(JSONResponse, self).__init__(dump_json(data), mimetype='text/javascript')
class SharedDataMiddleware(object):
"""
Redirects calls to an folder with static data.
"""
def __init__(self, app, exports):
self.app = app
self.exports = exports
self.cache = {}
def serve_file(self, filename, start_response):
from mimetypes import guess_type
guessed_type = guess_type(filename)
mime_type = guessed_type[0] or 'text/plain'
expiry = time() + 3600 # one hour
expiry = asctime(gmtime(expiry))
start_response('200 OK', [('Content-Type', mime_type),
('Cache-Control', 'public'),
('Expires', expiry)])
f = open(filename, 'rb')
try:
return [f.read()]
finally:
f.close()
def __call__(self, environ, start_response):
p = environ.get('PATH_INFO', '')
if p in self.cache:
return self.serve_file(self.cache[p], start_response)
for search_path, file_path in self.exports.iteritems():
if not search_path.endswith('/'):
search_path += '/'
if p.startswith(search_path):
real_path = path.join(file_path, p[len(search_path):])
if path.exists(real_path) and path.isfile(real_path):
self.cache[p] = real_path
return self.serve_file(real_path, start_response)
return self.app(environ, start_response)
class NotFound(Exception):
"""
Raise to display the 404 error page.
"""
def __init__(self, show_keyword_matches=False):
self.show_keyword_matches = show_keyword_matches
Exception.__init__(self, show_keyword_matches)