Commit 7f9c0a5a authored by Pietro Albini's avatar Pietro Albini

Merge branch 'auth' into develop

parents 45a28f05 8e6e4e6d
Pipeline #78 passed with stage
in 0 seconds
......@@ -31,17 +31,19 @@ setuptools.setup(
description = "Source code of the ubuntu-it website",
install_requires = [
"flask",
"click",
"flask",
"flask-openid",
"gunicorn",
"requests",
"itsdangerous",
"toml",
"pyyaml",
"requests",
"toml",
],
packages = [
"uitwww",
"uitwww.third_party"
],
entry_points = {
......
# Source code of the Ubuntu-it website
# Copyright (C) 2015-2016 Pietro Albini <pietroalbini@ubuntu.com>
# Copyright (C) 2015-2018 Pietro Albini <pietroalbini@ubuntu.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
......@@ -18,11 +18,14 @@ import os
import flask
from . import pages
from . import auth
from . import cache
from . import db
from . import pages
from . import utils
from . import download
from . import navbar
from . import utils
def create_app(data_path):
......@@ -42,14 +45,21 @@ def create_app(data_path):
with open(os.path.join(data_path, "secret_key")) as f:
app.secret_key = f.read().strip()
# Initialize the database
app.db = db.Database(os.path.join(data_path, "database.db"))
app.db.init()
app.config["CACHE_PATH"] = os.path.join(data_path, "cache")
cache.install_cache(app)
utils.prepare_app(app)
app.download = download.Downloads(data_path)
app.register_blueprint(
app.download.prepare_blueprint(app),
url_prefix="/download",
)
app.register_blueprint(auth.prepare_blueprint(app), url_prefix="/+auth")
app.register_blueprint(pages.prepare_blueprint(app))
......@@ -57,6 +67,10 @@ def create_app(data_path):
nav.add_generator("download-distros", app.download.generate_navbar)
nav.install(app)
@app.errorhandler(403)
def forbidden(error):
return flask.render_template("403.html"), 403
@app.errorhandler(404)
def not_found(error):
return flask.render_template("404.html"), 404
......
......@@ -51,6 +51,7 @@ def run(data, gunicorn_config, port, public, workers, debug):
if debug:
extra_files = [
os.path.join(src_directory, "data/navbar.yml"),
os.path.join(src_directory, "data/permissions.yml"),
]
app.run(debug=True, port=port, host=host, extra_files=extra_files)
......
# Source code of the Ubuntu-it website
# Copyright (C) 2018 Pietro Albini <pietroalbini@ubuntu.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; witout even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import functools
import uuid
import flask
import flask_openid
import pkg_resources
import time
import yaml
from uitwww.third_party import openid_teams
SESSION_EXPIRES_AFTER = 86400
class SessionError(BaseException):
pass
class Sessions:
def __init__(self, db):
self.db = db
def create(self, nickname, teams):
"""Create a new session and return its ID"""
id = str(uuid.uuid4())
ip = flask.request.remote_addr
self.db.update(
"INSERT INTO auth_sessions (id, nickname, teams, ip, expires_at) "
"VALUES (?, ?, ?, ?, ?)", id, nickname, ",".join(teams), ip,
int(time.time()) + SESSION_EXPIRES_AFTER,
)
return id
def check(self, id):
"""Check if a session is valid and return its data"""
data = self.db.query(
"SELECT nickname, teams, ip, expires_at FROM auth_sessions "
"WHERE id = ?", id,
)
if not data:
raise SessionError("La tua sessione è scaduta, accedi di nuovo.")
nickname = data[0][0]
teams = data[0][1].split(",")
ip = data[0][2]
expires_at = data[0][3]
if expires_at is None or expires_at < time.time():
self.delete(id)
raise SessionError("La tua sessione è scaduta, accedi di nuovo.")
else:
# Bump the session every time a new page is visited
self.db.update(
"UPDATE auth_sessions SET expires_at = ? WHERE id = ?",
int(time.time()) + SESSION_EXPIRES_AFTER, id,
)
if ip != flask.request.remote_addr:
raise SessionError("Questa sessione è valida solo su un'altra rete, accedi di nuovo.")
return {
"nickname": data[0][0],
"teams": data[0][1].split(","),
}
def delete(self, id):
"""Delete a session"""
self.db.update("DELETE FROM auth_sessions WHERE id = ?;", id)
def delete_all(self, except_id):
"""Delete every session except this one"""
self.db.update("DELETE FROM auth_sessions WHERE id != ?;", except_id)
def all(self):
"""Return all the sessions"""
rows = self.db.query("SELECT id, nickname, teams, ip, expires_at FROM auth_sessions;")
now = time.time()
result = []
for row in rows:
# Delete expired sessions
if row[0] is None or row[4] < now:
self.delete(row[0])
continue
result.append({
"id": row[0],
"nickname": row[1],
"teams": row[2].split(","),
"ip": row[3],
"expires_at": row[4],
})
return result
def get(self, id):
"""Return details about a session"""
row = self.db.query("SELECT nickname, teams, ip, expires_at FROM auth_sessions WHERE id = ?;", id)
if row:
# Delete expired sessions
if row[0][3] is None or row[0][3] < time.time():
self.delete(id)
return
return {
"id": id,
"nickname": row[0][0],
"teams": row[0][1].split(","),
"ip": row[0][2],
"expires_at": row[0][3],
}
def count(self):
"""Return the number of active sessions"""
now = time.time()
return self.db.query(
"SELECT COUNT(*) FROM auth_sessions "
"WHERE expires_at IS NOT NULL AND expires_at >= ?;", now,
)[0][0]
class Permissions:
def __init__(self):
raw = pkg_resources.resource_string("uitwww", "data/permissions.yml")
self.config = yaml.load(raw.decode("utf-8"))
def allowed_teams(self):
"""Return a list of teams allowed to log in"""
return list(self.config["teams"].keys())
def check(self, name):
"""Check if the current user has the permission"""
# The user must be authenticated
if "auth_teams" not in flask.g:
return False
# The permission must exist
if name not in self.config["permissions"]:
raise RuntimeError("Missing permission: %s" % name)
# Check if one of the user's teams has the permission
for team in flask.g.auth_teams:
if self.config["teams"][team] == "*":
return True
elif name in self.config["teams"][team]:
return True
return False
def permission(perms):
"""Process the endpoint only if the user has permission"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Don't check anything if the user is not logged in
if "auth_id" not in flask.g:
return flask.abort(403)
# Generate the list of allowed permissions
names = []
if type(perms) == dict:
for name, condition in perms.items():
if condition(*args, **kwargs):
names.append(name)
else:
names.append(perms)
# Check each allowed permission
for name in names:
if flask.current_app.permissions.check(name):
return func(*args, **kwargs)
return flask.abort(403)
return wrapper
return decorator
def prepare_blueprint(app):
"""Prepare the auth blueprint"""
bp = flask.Blueprint("auth", __name__)
oid = flask_openid.OpenID(
app,
safe_roots=[],
extension_responses=[openid_teams.TeamsResponse],
)
sessions = Sessions(app.db)
app.permissions = Permissions()
@app.context_processor
def add_permission_function():
return {
"permission": app.permissions.check,
}
@app.before_request
def check_auth():
if "auth" in flask.session:
try:
data = sessions.check(flask.session["auth"])
except SessionError as e:
del flask.session["auth"]
flask.flash(str(e), "error")
return
flask.g.auth_id = flask.session["auth"]
flask.g.auth_name = data["nickname"]
flask.g.auth_teams = data["teams"]
flask.g.auth_sessions_count = sessions.count()
@oid.after_login
def receive_openid(resp):
teams = resp.extensions["lp"].is_member
if not teams or teams == [""]:
flask.flash("Non hai i permessi per accedere al sito.", "error")
return flask.redirect(flask.url_for("pages.index"))
flask.session["auth"] = sessions.create(resp.nickname, teams)
flask.flash("Benvenuto %s!" % resp.nickname, "success")
return flask.redirect(flask.url_for("pages.index"))
@bp.route("/login")
@oid.loginhandler
def login():
if "auth_name" not in flask.g:
return oid.try_login(
"https://login.ubuntu.com/+openid",
ask_for=["nickname"],
extensions=[
openid_teams.TeamsRequest(app.permissions.allowed_teams())
],
)
else:
flask.flash("Hai già effettuato l'accesso!", "info")
return flask.redirect(flask.url_for("pages.index"))
@bp.route("/logout")
@permission("auth.logout")
def logout():
sessions.delete(flask.session["auth"])
del flask.session["auth"]
flask.flash("La sessione è stata terminata correttamente.", "success")
return flask.redirect(flask.url_for("pages.index"))
@bp.route("/sessions")
@permission("auth.sessions.manage")
def sessions_list():
return flask.render_template(
"auth/sessions.html",
sessions=sessions.all(),
)
@bp.route("/sessions/+all/revoke")
@permission("auth.sessions.manage")
def sessions_revoke_all():
sessions.delete_all(flask.g.auth_id)
return flask.redirect(flask.url_for(".sessions_list"))
@bp.route("/sessions/<id>")
@permission({
"auth.sessions.manage": lambda id: True,
"auth.sessions.own": lambda id: flask.g.auth_id == id,
})
def sessions_show(id):
data = sessions.get(id)
if data is None:
return flask.abort(404)
all = sessions.all()
others = []
for session in all:
if session["id"] == id or session["nickname"] != data["nickname"]:
continue
others.append(session)
return flask.render_template(
"auth/session.html",
session=data,
others=others,
)
@bp.route("/sessions/<id>/revoke")
@permission({
"auth.sessions.manage": lambda id: True,
"auth.sessions.own": lambda id: flask.g.auth_id == id,
})
def sessions_revoke(id):
sessions.delete(id)
return flask.redirect(flask.url_for(".sessions_list"))
return bp
......@@ -61,6 +61,14 @@ def after_request(response):
else:
cache_path = os.path.realpath(app.config["CACHE_PATH"])
# Don't cache the page if flashed messages were displayed
if flask.get_flashed_messages():
return response
# Don't cache the page if the user is authenticated
if "auth_name" in flask.g:
return response
url = flask.request.path
method = flask.request.method
status = response.status_code
......
permissions:
- auth.logout
- auth.sessions.manage
- auth.sessions.own
teams:
ubuntu-it-council: "*"
ubuntu-it-www: "*"
ubuntu-it-members:
- auth.sessions.own
- auth.logout
ubuntu-it-newsletter:
- auth.sessions.own
- auth.logout
# Source code of the Ubuntu-it website
# Copyright (C) 2015 Pietro Albini <pietroalbini@ubuntu.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; witout even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import threading
import sqlite3
import pkg_resources
_LOCAL = threading.local()
class Database:
def __init__(self, path):
self._path = path
def init(self):
"""Initialize the database"""
db_version = int(self.query("PRAGMA user_version;")[0][0])
cursor = self.cursor()
if db_version == 0:
cursor.executescript("""
CREATE TABLE migrations (name TEXT PRIMARY KEY);
PRAGMA user_version = 1;
""")
applied = [m[0] for m in self.query("SELECT * FROM migrations;")]
for name, sql in MIGRATIONS:
if name in applied:
continue
cursor.executescript(sql)
cursor.execute("INSERT INTO migrations (name) VALUES (?)", [name])
cursor.connection.commit()
def cursor(self):
"""Get a new cursor"""
if not hasattr(_LOCAL, "db"):
_LOCAL.db = sqlite3.connect(self._path)
return _LOCAL.db.cursor()
def query(self, query, *params, update=False):
"""Make a new query against the db"""
cursor = self.cursor()
cursor.execute(query, params)
try:
if update:
cursor.connection.commit()
else:
return cursor.fetchall()
finally:
cursor.close()
def update(self, query, *params):
"""Make a new update query against the db"""
self.query(query, *params, update=True)
MIGRATIONS = [
("create_auth_sessions_table", """
CREATE TABLE auth_sessions (
id TEXT PRIMARY KEY,
nickname TEXT NOT NULL,
teams TEXT NOT NULL,
ip TEXT NOT NULL
);
"""),
("add_auth_sessions_expires_at_column", """
ALTER TABLE auth_sessions ADD COLUMN expires_at INTEGER;
"""),
]
{# Source code of the Ubuntu-it website
# Copyright (C) 2018 Pietro Albini <pietroalbini@ubuntu.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; witout even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{% extends "layout.html" %}
{% block title %}Accesso negato{% endblock %}
{% block content %}
<div class="page">
<div class="row">
<div class="col">
<h1>Accesso negato</h1>
{% if g.auth_id %}
<p>
Il tuo gruppo non dispone dei permessi necessari per
visualizzare questa pagina.
</p>
{% else %}
<p>
Questa pagina è riservata ai membri di Ubuntu-it con
adeguati permessi. <a href="{{ url_for("auth.login") }}">
Effettua l'accesso</a> per visualizzarla.
</p>
{% endif %}
</div>
</div>
</div>
{% endblock %}
{# Source code of the Ubuntu-it website
# Copyright (C) 2018 Pietro Albini <pietroalbini@ubuntu.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; witout even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{% extends "layout.html" %}
{% block title %}Dettagli sessione{% endblock %}
{% block content %}
<div class="page">
<div class="row">
<div class="col">
<h1>Dettagli sessione</h1>
<div class="table">
<table>
<tr>
<td>Nome utente</td>
<td><b>{{ session.nickname }}</b></td>
</tr>
<tr>
<td>Indirizzo IP</td>
<td>{{ session.ip }}</td>
</tr>
<tr>
<td>Scadenza</td>
<td>{{ session.expires_at|format_timestamp }}</td>
</tr>
<tr>
<td>Team</td>
<td>
{% for team in session.teams %}
<a href="https://launchpad.net/~{{ team }}">
~{{ team -}}
</a>
{%- if not loop.last %},{% endif %}
{% endfor %}
</td>
</tr>
</table>
</div>
{% if g.auth_id != session.id %}
<a class="btn" href="{{ url_for(".sessions_revoke", id=session.id) }}">
Disabilita sessione
</a>
{% endif %}
</div>
</div>
{% if others %}
<div class="row">
<div class="col">
<h2>Altre sessioni dell'utente</h2>
<div class="table">
<table>
<tr>
<th>Indirizzo IP</th>
<th>Scadenza</th>
<th></th>
<th></th>
</tr>
{% for session in others %}
<tr>
<td>{{ session.ip }}</td>
<td>{{ session.expires_at|format_timestamp }}
<td>
<a href="{{ url_for(".sessions_show", id=session.id) }}">
Dettagli
</a>
</td>
<td>
{% if session.id == g.auth_id %}
Sessione corrente
{% else %}
<a href="{{ url_for(".sessions_revoke", id=session.id) }}">
Disabilita sessione
</a>
{% endif %}
</td>
</tr>
{% endfor %}
</table>
</div>
</div>
</div>
{% endif %}
</div>
{% endblock %}
{# Source code of the Ubuntu-it website
# Copyright (C) 2018 Pietro Albini <pietroalbini@ubuntu.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; witout even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{% extends "layout.html" %}
{% block title %}Sessioni attive{% endblock %}
{% block content %}
<div class="page">
<div class="row">
<div class="col">
<h1>Sessioni attive</h1>
{% if g.auth_sessions_count > 1 %}
<p>
<a class="btn" href="{{ url_for(".sessions_revoke_all") }}">
Disabilita ogni altra sessione
</a>
</p>
{% endif %}
</div>
</div>
<div class="row">
<div class="col">
<div class="table">
<table>
<tr>
<th>Nome utente</th>
<th>Indirizzo IP</th>
<th>Scadenza</th>
<th></th>
<th></th>
</tr>
{% for session in sessions %}
<tr>
<td>{{ session.nickname }}</td>
<td>{{ session.ip }}</td>
<td>{{ session.expires_at|format_timestamp }}</td>
<td>
<a href="{{ url_for(".sessions_show", id=session.id) }}">
Dettagli
</a>
</td>
<td>
{% if session.id == g.auth_id %}
Sessione corrente
{% else %}
<a href="{{ url_for(".sessions_revoke", id=session.id) }}">
Disabilita sessione
</a>
{% endif %}
</td>
</tr>
{% endfor %}
</table>
</div>
</div>
</div>
</div>
{% endblock %}
......@@ -44,6 +44,30 @@
</div>
</div>
{% if g.auth_name %}
<nav class="sites-list">
<div class="container">
<ul class="left">
{% if permission("auth.sessions.manage") %}
<li><a href="{{ url_for("auth.sessions_list") }}">
Sessioni attive: {{ g.auth_sessions_count }}
</a></li>
{% endif %}
</ul>
<ul class="right">
{% if permission("auth.sessions.own") %}
<li><a href="{{ url_for("auth.sessions_show", id=g.auth_id) }}">
{{ g.auth_name }}
</a></li>
{% endif %}
{% if permission("auth.logout") %}
<li><a href="{{ url_for("auth.logout") }}">Esci</a></li>
{% endif %}
</ul>
</div>
</nav>
{% endif %}
<nav class="sites-list">
<div class="container">
<ul>
......@@ -84,6 +108,17 @@
{% endfor %}
<div class="container">
{% with messages = get_flashed_messages(with_categories=True) %}
{% if messages %}
{% for category, message in messages %}
<div class="alert {% if category != "message" %}{{ category }}{% else %}info{% endif %}">
{{ message }}
<span class="close">&times;</span>
</div>
{% endfor %}
{% endif %}
{% endwith %}
{% block content %}{% endblock %}
<footer>
......@@ -95,7 +130,7 @@
Sito web realizzato dal Gruppo Web di Ubuntu-it, con
<a href="https://www.python.org/">Python</a>,
<a href="https://www.palletsproject.com/p/flask/">Flask</a> e
<a href="http://www.postgresql.org/">PostgreSQL</a>.
<a href="https://www.sqlite.org/">SQLite</a>.
</p>
<ul>
<li><a href="{{ url_for("pages.cookies") }}">
......@@ -110,6 +145,11 @@
<li><a href="https://wiki.ubuntu-it.org/GruppoWeb">
Collabora con noi
</a></li>
{% if not g.auth_name %}
<li><a href="{{ url_for("auth.login") }}">
Accedi
</a></li>
{% endif %}
</ul>
</footer>
</div>
......
# Launchpad OpenID Teams Extension support for python-openid
#
# Copyright (C) 2008-2013 Canonical Ltd.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""Team membership support for Launchpad.
The primary form of communication between the RP and Launchpad is an
OpenID authentication request. Our solution is to piggyback a team
membership test onto this interaction.
As part of an OpenID authentication request, the RP includes the
following fields:
openid.ns.lp:
An OpenID 2.0 namespace URI for the extension. It is not strictly
required for 1.1 requests, but including it is good for forward
compatibility.
It must be set to: http://ns.launchpad.net/2007/openid-teams
openid.lp.query_membership:
A comma separated list of Launchpad team names that the RP is
interested in.
As part of the positive assertion OpenID response, the following field
will be provided:
openid.ns.lp:
(as above)
openid.lp.is_member:
A comma separated list of teams that the user is actually a member
of. The list may be limited to those teams mentioned in the
request.
This field must be included in the response signature in order to
be considered valid (as the response is bounced through the user's
web browser, an unsigned value could be modified).
@since: 2.1.1
"""
from __future__ import unicode_literals
from openid import oidutil
from openid.extension import Extension
from openid.message import (
registerNamespaceAlias,
NamespaceAliasRegistrationError,
)
__all__ = [
'TeamsRequest',
'TeamsResponse',
'ns_uri',
'supportsTeams',
]
ns_uri = 'http://ns.launchpad.net/2007/openid-teams'
try:
registerNamespaceAlias(ns_uri, 'lp')
except NamespaceAliasRegistrationError as e:
oidutil.log(
'registerNamespaceAlias(%r, %r) failed: %s' % (ns_uri, 'lp', str(e)))
def supportsTeams(endpoint):
"""Does the given endpoint advertise support for Launchpad Teams?
@param endpoint: The endpoint object as returned by OpenID discovery
@type endpoint: openid.consumer.discover.OpenIDEndpoint
@returns: Whether an lp type was advertised by the endpoint
@rtype: bool
"""
return endpoint.usesExtension(ns_uri)
class TeamsNamespaceError(ValueError):
"""The Launchpad teams namespace was not found and could not
be created using the expected name (there's another extension
using the name 'lp')
This is not I{illegal}, for OpenID 2, although it probably
indicates a problem, since it's not expected that other extensions
will re-use the alias that is in use for OpenID 1.
If this is an OpenID 1 request, then there is no recourse. This
should not happen unless some code has modified the namespaces for
the message that is being processed.
"""
def getTeamsNS(message):
"""Extract the Launchpad teams namespace URI from the given
OpenID message.
@param message: The OpenID message from which to parse Launchpad
teams. This may be a request or response message.
@type message: C{L{openid.message.Message}}
@returns: the lp namespace URI for the supplied message. The
message may be modified to define a Launchpad teams
namespace.
@rtype: C{str}
@raise ValueError: when using OpenID 1 if the message defines
the 'lp' alias to be something other than a Launchpad
teams type.
"""
# See if there exists an alias for the Launchpad teams type.
alias = message.namespaces.getAlias(ns_uri)
if alias is None:
# There is no alias, so try to add one. (OpenID version 1)
try:
message.namespaces.addAlias(ns_uri, 'lp')
except KeyError as why:
# An alias for the string 'lp' already exists, but it's
# defined for something other than Launchpad teams
raise TeamsNamespaceError(why[0])
# we know that ns_uri defined, because it's defined in the
# else clause of the loop as well, so disable the warning
return ns_uri
class TeamsRequest(Extension):
"""An object to hold the state of a Launchpad teams request.
@ivar query_membership: A comma separated list of Launchpad team
names that the RP is interested in.
@type required: [str]
@group Consumer: requestField, requestTeams, getExtensionArgs,
addToOpenIDRequest
@group Server: fromOpenIDRequest, parseExtensionArgs
"""
ns_alias = 'lp'
def __init__(self, query_membership=None, lp_ns_uri=ns_uri):
"""Initialize an empty Launchpad teams request"""
Extension.__init__(self)
self.query_membership = []
self.ns_uri = lp_ns_uri
if query_membership:
self.requestTeams(query_membership)
# Assign getTeamsNS to a static method so that it can be
# overridden for testing.
_getTeamsNS = staticmethod(getTeamsNS)
def fromOpenIDRequest(cls, request):
"""Create a Launchpad teams request that contains the
fields that were requested in the OpenID request with the
given arguments
@param request: The OpenID request
@type request: openid.server.CheckIDRequest
@returns: The newly created Launchpad teams request
@rtype: C{L{TeamsRequest}}
"""
self = cls()
# Since we're going to mess with namespace URI mapping, don't
# mutate the object that was passed in.
message = request.message.copy()
self.ns_uri = self._getTeamsNS(message)
args = message.getArgs(self.ns_uri)
self.parseExtensionArgs(args)
return self
fromOpenIDRequest = classmethod(fromOpenIDRequest)
def parseExtensionArgs(self, args, strict=False):
"""Parse the unqualified Launchpad teams request
parameters and add them to this object.
This method is essentially the inverse of
C{L{getExtensionArgs}}. This method restores the serialized
Launchpad teams request fields.
If you are extracting arguments from a standard OpenID
checkid_* request, you probably want to use C{L{fromOpenIDRequest}},
which will extract the lp namespace and arguments from the
OpenID request. This method is intended for cases where the
OpenID server needs more control over how the arguments are
parsed than that method provides.
>>> args = message.getArgs(ns_uri)
>>> request.parseExtensionArgs(args)
@param args: The unqualified Launchpad teams arguments
@type args: {str:str}×
@param strict: Whether requests with fields that are not
defined in the Launchpad teams specification should be
tolerated (and ignored)
@type strict: bool
@returns: None; updates this object
"""
items = args.get('query_membership')
if items:
for team_name in items.split(','):
try:
self.requestTeam(team_name, strict)
except ValueError:
if strict:
raise
def allRequestedTeams(self):
"""A list of all of the Launchpad teams that were
requested.
@rtype: [str]
"""
return self.query_membership
def wereTeamsRequested(self):
"""Have any Launchpad teams been requested?
@rtype: bool
"""
return bool(self.allRequestedTeams())
def __contains__(self, team_name):
"""Was this team in the request?"""
return team_name in self.query_membership
def requestTeam(self, team_name, strict=False):
"""Request the specified team from the OpenID user
@param team_name: the unqualified Launchpad team name
@type team_name: str
@param strict: whether to raise an exception when a team is
added to a request more than once
@raise ValueError: when strict is set and the team was
requested more than once
"""
if strict:
if team_name in self.query_membership:
raise ValueError('That team has already been requested')
else:
if team_name in self.query_membership:
return
self.query_membership.append(team_name)
def requestTeams(self, query_membership, strict=False):
"""Add the given list of teams to the request
@param query_membership: The Launchpad teams request
@type query_membership: [str]
@raise ValueError: when a team requested is not a string
or strict is set and a team was requested more than once
"""
if isinstance(query_membership, str):
raise TypeError('Teams should be passed as a list of '
'strings (not %r)' % (type(query_membership),))
for team_name in query_membership:
self.requestTeam(team_name, strict=strict)
def getExtensionArgs(self):
"""Get a dictionary of unqualified Launchpad teams
arguments representing this request.
This method is essentially the inverse of
C{L{parseExtensionArgs}}. This method serializes the Launchpad
teams request fields.
@rtype: {str:str}
"""
args = {}
if self.query_membership:
args['query_membership'] = ','.join(self.query_membership)
return args
class TeamsResponse(Extension):
"""Represents the data returned in a Launchpad teams response
inside of an OpenID C{id_res} response. This object will be
created by the OpenID server, added to the C{id_res} response
object, and then extracted from the C{id_res} message by the
Consumer.
@ivar data: The Launchpad teams data, an array.
@ivar ns_uri: The URI under which the Launchpad teams data was
stored in the response message.
@group Server: extractResponse
@group Consumer: fromSuccessResponse
@group Read-only dictionary interface: keys, iterkeys, items, iteritems,
__iter__, get, __getitem__, keys, has_key
"""
ns_alias = 'lp'
def __init__(self, is_member=None, lp_ns_uri=ns_uri):
Extension.__init__(self)
if is_member is None:
self.is_member = []
else:
self.is_member = is_member
self.ns_uri = lp_ns_uri
def addTeam(self, team_name):
if team_name not in self.is_member:
self.is_member.append(team_name)
def extractResponse(cls, request, is_member_str):
"""Take a C{L{TeamsRequest}} and a list of Launchpad
team values and create a C{L{TeamsResponse}}
object containing that data.
@param request: The Launchpad teams request object
@type request: TeamsRequest
@param is_member: The Launchpad teams data for this
response, as a list of strings.
@type is_member: {str:str}
@returns: a Launchpad teams response object
@rtype: TeamsResponse
"""
self = cls()
self.ns_uri = request.ns_uri
self.is_member = is_member_str.split(',')
return self
extractResponse = classmethod(extractResponse)
# Assign getTeamsNS to a static method so that it can be
# overridden for testing
_getTeamsNS = staticmethod(getTeamsNS)
def fromSuccessResponse(cls, success_response, signed_only=True):
"""Create a C{L{TeamsResponse}} object from a successful OpenID
library response
(C{L{openid.consumer.consumer.SuccessResponse}}) response
message
@param success_response: A SuccessResponse from consumer.complete()
@type success_response: C{L{openid.consumer.consumer.SuccessResponse}}
@param signed_only: Whether to process only data that was
signed in the id_res message from the server.
@type signed_only: bool
@rtype: TeamsResponse
@returns: A Launchpad teams response containing the data
that was supplied with the C{id_res} response.
"""
self = cls()
self.ns_uri = self._getTeamsNS(success_response.message)
if signed_only:
args = success_response.getSignedNS(self.ns_uri)
else:
args = success_response.message.getArgs(self.ns_uri)
if "is_member" in args:
is_member_str = args["is_member"]
self.is_member = is_member_str.split(',')
return self
fromSuccessResponse = classmethod(fromSuccessResponse)
def getExtensionArgs(self):
"""Get the fields to put in the Launchpad teams namespace
when adding them to an id_res message.
@see: openid.extension
"""
ns_args = {'is_member': ','.join(self.is_member)}
return ns_args
......@@ -14,6 +14,7 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import datetime
import random
import string
......@@ -100,3 +101,13 @@ class ReverseProxied:
if scheme:
environ['wsgi.url_scheme'] = scheme
return self.app(environ, start_response)
def prepare_app(app):
"""Add some utilities to the app"""
@app.template_filter("format_timestamp")
def format_timestamp(timestamp):
return datetime.datetime.fromtimestamp(
int(timestamp)
).strftime('%d/%m/%Y %H:%M:%S')
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment