Switch to a separated processor and add a nicer frontend

The separated jobs processor should ensure no more crashes stopping
managetests, and the new frontends provides more information.
parent 35412d7e
# A program which manages Ubuntu-it's web test server
# Copyright (C) 2015 Pietro Albini <pietroalbini@ubuntu.com>
# Copyright (C) 2015-2016 Pietro Albini <pietroalbini@ubuntu.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
......@@ -20,6 +20,7 @@ import pathlib
import pkg_resources
from . import utils
from . import processor
from . import branches
from . import instances
from . import frontend
......@@ -35,10 +36,9 @@ class TestsManager:
self.gitlab = gitlab.GitLabAPI(self)
hooks_processor = frontend.HooksProcessor(self)
self.frontend = frontend.create_app(self, hooks_processor)
self.instances = instances.InstancesManager(self, self.frontend, port,
hooks_processor)
self.processor = processor.Processor()
self.frontend = frontend.create_app(self)
self.instances = instances.InstancesManager(self, self.frontend, port)
if not utils.is_root_valid(root):
raise RuntimeError("Invalid root directory: %s" % root)
......@@ -48,69 +48,61 @@ class TestsManager:
"managetests", "gunicorn_config"
)
self._load_details()
self._load_branches()
def _load_details(self):
"""Load details from the root directory"""
# Load the details
with (self.root / "details.json").open() as f:
self.details = json.load(f)
def _load_branches(self):
self.init_branches()
def init_branches(self):
"""Load all the branches"""
self.branches = {}
def add(name, mr):
"""Add a new branch"""
if name in self.branches:
return
self.branches[name] = branches.Branch(self, name, mr)
# Load already present branches from merge requests
for branch_name, mr in self.details["branches"].copy().items():
self.load_branch(branch_name, mr)
for name, mr in self.details["branches"].copy().items():
add(name, mr)
# Load also new forced-to-be-keeped branches
for branch_name in self.config["keep-branches"]:
if branch_name in self.branches:
continue
self.load_branch(branch_name, None)
for name in self.config["keep-branches"]:
add(name, None)
# Load also new branches from merge requests
new = self.gitlab.merge_requests()
if new is None:
raise RuntimeError("Can't get merge requests from GitLab!")
for request in new:
# Skip already loaded requests
if request["source_branch"] in self.branches:
continue
self.load_branch(request["source_branch"], request["id"])
def load_branch(self, name, mr):
"""Load a branch"""
branch = branches.Branch(self, name, mr)
if not branch.present and branch.active:
branch.deploy()
elif not branch.active:
if branch.present:
branch.destroy()
return
self.branches[name] = branch
# Load the branch in the runner
self.instances.load_branch(branch)
add(request["source_branch"], request["id"])
self.details["branches"][name] = mr
# Save the branch details
self.details["branches"][name] = mr
self.save_details()
def remove_branch(self, name):
"""Remove a branch"""
if name not in self.branches:
return
self.instances.remove_branch(name)
self.branches[name].destroy()
del self.branches[name]
if name in self.details["branches"]:
del self.details["branches"][name]
self.save_details()
def sync_branches_jobs(self):
"""Get the jobs needed to sync branches"""
for branch in list(self.branches.values()):
if branch.active:
if not branch.present():
# Deploy the branch and load it
def deploy(branch=branch):
branch.deploy()
self.instances.load_branch(branch)
yield deploy
else:
if branch.present():
# Unload the branch, destroy and forget about it
def destroy(branch=branch):
self.instances.remove_branch(branch.name)
branch.destroy()
if branch.name in self.details["branches"]:
del self.details["branches"]
self.save_details()
yield destroy
def save_details(self):
"""Save details on disk"""
......@@ -119,14 +111,31 @@ class TestsManager:
def run(self):
"""Run all the test instances"""
# Load all the branches in the instances manager
for branch in self.branches.values():
if not branch.present():
continue
self.instances.load_branch(branch)
# Queue all the sync jobs
for job in self.sync_branches_jobs():
self.processor.queue(job)
# Start the various components
self.processor.start()
self.instances.run()
def process_hook(self, data):
"""Process a webhook received from GitLab"""
# Stop the processor
self.processor.stop()
def queue_hook(self, data):
"""Queue a new hook"""
if data["object_kind"] == "push":
self._process_push_hook(data)
func = self._process_push_hook
elif data["object_kind"] == "merge_request":
self._process_merge_request_hook(data)
func = self._process_merge_request_hook
self.processor.queue(lambda: func(data))
def _process_push_hook(self, data):
"""Process a push hook received from GitLab"""
......@@ -141,8 +150,10 @@ class TestsManager:
mr = self.branches[branch].mr
# Rebuild the branch from scratch
self.remove_branch(branch)
self.load_branch(branch, mr)
self.instances.remove_branch(branch)
self.branches[branch].destroy()
self.branches[branch].deploy()
self.instances.load_branch(self.branches[branch])
# Send an alert on the merge request, if this branch has one
if mr is not None:
......@@ -160,11 +171,16 @@ class TestsManager:
if branch in self.branches and obj["state"] != "opened":
self.gitlab.post_comment(obj["id"], "L'istanza live per il branch "
"**%s** è stata rimossa." % branch)
self.remove_branch(branch)
self.instances.remove_branch(branch)
self.branches[branch].destroy()
# The instance should be created
elif branch not in self.branches and obj["state"] == "opened":
self.load_branch(branch, obj["id"])
self.branches[branch] = branches.Branch(self, branch, mr)
self.branches[branch].deploy()
self.instances.load_branch(self.branches[branch])
self.gitlab.post_comment(obj["id"], "Istanza live per il branch "
"**%s** disponibile sul [server di test]"
"(http://wwwtest.ubuntu-it.org/%s/)."
......
# A program which manages Ubuntu-it's web test server
# Copyright (C) 2015 Pietro Albini <pietroalbini@ubuntu.com>
# Copyright (C) 2015-2016 Pietro Albini <pietroalbini@ubuntu.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
......@@ -40,10 +40,11 @@ class Branch:
self.assignee_url = None
self.config = None
self._deploying = False
self.pinned = name in manager.config["keep-branches"]
self.check_remote_status()
self.check_local_status()
self.load_config()
def check_remote_status(self):
......@@ -70,40 +71,54 @@ class Branch:
self.assignee = result["assignee"]["username"]
self.assignee_url = result["assignee"]["web_url"]
def check_local_status(self):
"""Check if the branch is present locally"""
if (self.manager.root / "branches" / self.name).exists():
self.present = self.valid()
else:
self.present = False
# Metadata:
def valid(self):
"""Check if a branch is valid"""
def present(self):
"""Check if a branch is present"""
root = self.manager.root / "branches" / self.name
if root.exists():
# The branch must be a valid directory
if not (root.exists() and root.is_dir()):
return False
# The branch must be a valid directory
if not (root.exists() and root.is_dir()):
return False
# All the files must be be present
files = ["version", "config.json"]
for file in files:
file = root / file
if not (file.exists() and file.is_file()):
return False
# All the files must be be present
files = ["version", "config.json"]
for file in files:
file = root / file
if not (file.exists() and file.is_file()):
return False
# The directory version must be correct
with (root / "version").open() as f:
if f.read().strip() != BRANCH_DIR_VERSION:
return False
# The directory version must be correct
with (root / "version").open() as f:
if f.read().strip() != BRANCH_DIR_VERSION:
return False
return True
return False
return True
def is_running(self):
"""Check if a branch is running"""
return self.manager.instances.is_running(self.name)
def has_build_log(self):
"""Check if the branch has a build log"""
path = self.manager.root / "branches" / self.name / "build.log"
return path.exists()
def status(self):
"""Get the status of the branch"""
if self._deploying:
return "deploying"
elif not self.present():
return "not_present"
elif self.is_running():
return "running"
else:
return "broken"
# Deploy and destroy:
def load_config(self):
"""Load the configuration of this branch"""
file = self.manager.root / "branches" / self.name / "config.json"
......@@ -155,13 +170,13 @@ class Branch:
def deploy(self):
"""Deploy the branch"""
self.check_local_status()
# Ok, branch already deployed
if self.present:
if self.present():
return
print("[i] Started a build of the '%s' branch" % self.name)
self._deploying = True
print("[i] Started a deploy of the '%s' branch" % self.name)
# Start from a fresh branch dir
branch = self.manager.root / "branches" / self.name
......@@ -237,15 +252,15 @@ class Branch:
self.manager.details["branches"][self.name] = self.mr
self.manager.save_details()
self._deploying = False
def destroy(self):
"""Destroy the local copy"""
self.check_local_status()
# Ok, branch not present
if not self.present:
if not self.present():
return
shutil.rmtree(str(self.manager.root_dir / "branches" / name))
shutil.rmtree(str(self.manager.root / "branches" / self.name))
try:
del self.manager.details["branches"][self.name]
......
......@@ -14,39 +14,12 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import queue
import threading
import logging
import flask
class HooksProcessor(threading.Thread):
"""A class which processes hooks"""
def __init__(self, manager):
self.manager = manager
self.q = queue.Queue()
self.stop = False
super(HooksProcessor, self).__init__()
def append(self, data):
"""Append something to the queue"""
self.q.put(data)
def run(self):
"""Run the thread"""
while not self.stop:
try:
data = self.q.get(timeout=0.2)
except queue.Empty:
continue
self.manager.process_hook(data)
def create_app(manager, processor):
def create_app(manager):
"""Create an instance of the frontend app"""
app = flask.Flask(__name__, static_url_path="/+assets")
......@@ -56,8 +29,7 @@ def create_app(manager, processor):
@app.route("/")
def list_branches():
branches = [b for b in manager.branches.values()
if b.name in manager.instances._processes]
branches = list(manager.branches.values())
branches.sort(key=lambda branch: branch.name)
branches.sort(key=lambda branch: branch.pinned, reverse=True)
......@@ -77,7 +49,9 @@ def create_app(manager, processor):
if token != manager.config["hooks-token"]:
return "UNAUTHORIZED", 401
processor.append(flask.request.json)
# Queue a new hook
manager.queue_hook(flask.request.json)
return "OK", 200
# Because it's impossible to run this app in debug mode, this handler
......
/* A program which manages Ubuntu-it's web test server
* Copyright (C) 2015 Pietro Albini <pietroalbini@ubuntu.com>
* Copyright (C) 2015-2016 Pietro Albini <pietroalbini@ubuntu.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
......@@ -63,13 +63,12 @@ div.wrapper {
margin: auto;
}
ul.branches {
text-align: left;
table.branches {
width: 100%;
}
ul.branches li small {
display: inline-block;
margin-left: 0.5em;
table.branches td, table.branches th {
padding: 0.3em 0;
}
ul.footer {
......@@ -88,6 +87,22 @@ ul.footer li:first-child {
margin-left: 0;
}
span.color-negative {
color: #d73024;
}
span.color-warning {
color: #f99b11;
}
span.color-positive {
color: #0f8420;
}
span.color-informative {
color: #007aa6;
}
@media all and (max-width: 52em) {
......
{# A program which manages Ubuntu-it's web test server
# Copyright (C) 2015 Pietro Albini <pietroalbini@ubuntu.com>
# Copyright (C) 2015-2016 Pietro Albini <pietroalbini@ubuntu.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
......@@ -19,30 +19,58 @@
{% set title = "Branch disponibili" %}
{% block content %}
{% if not branches %}
<p>Nessun branch disponibile...</p>
{% else %}
<ul class="branches">
{% for branch in branches %}
<li>
<a href="/{{ branch.name }}/">{{ branch.name }}</a>
<span class="extra">
<table class="branches">
<thead>
<tr>
<th width="30%">Nome branch</th>
<th width="15%">Stato</th>
<th width="25%">Proprietario</th>
<th width="7%">MR</th>
<th width="10%"></th>
<th width="13%"></th>
</tr>
</thead>
<tbody>
{% for branch in branches %}
<tr>
<td>{{ branch.name }}</td>
<td>
{% set status = branch.status() %}
{% if status == "deploying" %}
<span class="color-informative">Build in corso</span>
{% elif status == "not_present" %}
<span class="color-warning">Non presente</span>
{% elif status == "running" %}
<span class="color-positive">In esecuzione</span>
{% elif status == "broken" %}
<span class="color-negative">Branch rotto</span>
{% else %}
{{ status }}
{% endif %}
</td>
{% if branch.pinned %}
<small>branch fisso</small>
<td>-</td>
<td>-</td>
{% else %}
<small>
branch di <a href="{{ branch.author_url }}">{{ branch.author }}</a>
{% if branch.assignee and branch.assignee != branch.author %}
e <a href="{{ branch.assignee_url }}">{{ branch.assignee }}</a>
{% endif %}
</small>
<small>
<a href="{{ branch.mr_url }}">Merge request #{{ branch.mr_id }}</a>
</small>
<td>
<a href="{{ branch.author_url }}">{{ branch.author }}</a>
</td>
<td>
<a href="{{ branch.mr_url }}">!{{ branch.mr_id }}</a>
</td>
{% endif %}
</span>
</li>
{% endfor %}
</ul>
{% endif %}
<td>
{% if branch.has_build_log() %}
<a href="{{ url_for("build_log", branch=branch.name) }}">Build log</a>
{% endif %}
</td>
<td>
{% if branch.is_running() %}
<a href="/{{ branch.name }}/">Istanza live</a>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endblock %}
......@@ -24,7 +24,7 @@ from werkzeug import serving
class InstancesManager:
"""This class will manage all the running branches"""
def __init__(self, manager, main, main_port, processor):
def __init__(self, manager, main, main_port):
self.manager = manager
self.running = False
......@@ -34,12 +34,14 @@ class InstancesManager:
self._main_app = main
self._main_port = main_port
self._hooks_processor = processor
atexit.register(_stop(self))
def load_branch(self, branch):
"""Load a branch into the instances manager"""
if branch.name in self._branches:
return
self._branches[branch.name] = branch
if self.running:
......@@ -55,6 +57,10 @@ class InstancesManager:
self._processes[name].terminate()
del self._processes[name]
def is_running(self, name):
"""Check if a branch is running"""
return name in self._processes
def run(self):
"""Run all the instances"""
if self.running:
......@@ -66,9 +72,7 @@ class InstancesManager:
self._run_branch(branch)
# Starts also the receiver part
self._hooks_processor.start()
serving.run_simple("127.0.0.1", self._main_port, self._main_app)
self._hooks_processor.stop = True
# Stop all the running processes
for process in self._processes.values():
......@@ -90,8 +94,7 @@ class InstancesManager:
try:
process = subprocess.Popen(*args, **kwargs)
except FileNotFoundError:
print("managetests: error: can't start the %s branch: executable "
"not found" % branch.name)
print("[!] No executable found for branch '%s'" % branch.name)
return
self._processes[branch.name] = process
......
# A program which manages Ubuntu-it's web test server
# Copyright (C) 2016 Pietro Albini <pietroalbini@ubuntu.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; witout even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import traceback
import threading
import queue
class Processor(threading.Thread):
"""This processes the supplied tasks"""
def __init__(self):
self._queue = queue.Queue()
super().__init__()
def queue(self, job):
"""Queue a new job"""
if not callable(job):
raise RuntimeError("Job not callable: %r" % job)
self._queue.put(job)
def stop(self):
"""Stop the processor"""
self._queue.put(None)
self.join()
def run(self):
"""Run the thread"""
while True:
job = self._queue.get()
# This is the stop signal
if job is None:
break
# Execute the job
try:
job()
except:
traceback.print_exc()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment