learning 500 lines 《A Continuous Integration System》
《A Continuous Integration System》author: Malini Das
When developing software, we want to be able to verify that our new features or bug fixes are safe and work as expected. We do this by running tests against our code. Sometimes, developers will run tests locally to verify that their changes are safe, but developers may not have the time to test their code on every system their software runs in. Further, as more and more tests are added the amount of time required to run them, even only locally, becomes less viable. Because of this, continuous integration systems have been created.
Continuous Integration (CI) systems are dedicated systems used to test new code. Upon a commit to the code repository, it is the responsibility of the continuous integration system to verify that this commit will not break any tests. To do this, the system must be able to fetch the new changes, run the tests and report its results. Like any other system, it should also be failure resistant. This means if any part of the system fails, it should be able to recover and continue from that point.
This test system should also handle load well, so that we can get test results in a reasonable amount of time in the event that commits are being made faster than the tests can be run. We can achieve this by distributing and parallelizing the testing effort. This project will demonstrate a small, bare-bones distributed continuous integration system that is designed for extensibility.
什么是持续集成系统?
当我们开发软件的时候,我们希望能够验证:新的功能和bug修复是以我们期望的方式正常工作。通常这时候,我们通过运行测试用例的方式检查代码。有时候,开发者会在本地环境下去验证他们的改动是合格的,但是,开发者或许不会在每个运行环境测试运行状态。随着在本地增加越来越多的测试用例,每次开发完成后需要花费大量的时间去运行这些测试,即使是在本地,也很不方便。由于这个原因,持续集成(CI)便被提出来了。
持续集成系统是专门用来测试新提交代码的系统。更新提交代码至code repository,持续集成系统的作用就是去验证本次提交后测试用例是否通过。为了实现这个功能,持续集成系统必须能够监测到最新的代码改变,然后运行测试用例,并且报告测试结果。和其他系统一样,能够不受失败测试的影响。这意味着,如果任何部分测试失败,它能够从失败的点覆盖和继续运行。
这个持续集成系统需要很好的控制负载,当我们提交新的版本的时间比测试的时间短的时候,仍然可以在合理的时间看到测试结果。我们可以向多线程分发测试样例,并运行来实现这一点。这个项目将介绍一个小型的可拓展的分布式持续集成系统。
Project Limitations and Notes
This project uses Git as the repository for the code that needs to be tested. Only standard source code management calls will be used, so if you are unfamiliar with Git but are familiar with other version control systems (VCS) like svn or Mercurial, you can still follow along.
Due to the limitations of code length and unittest, I simplified test
discovery. We will only run tests that are in a directory named
tests
within the repository.
Continuous integration systems monitor a master repository which is usually hosted on a web server, and not local to the CI’s file systems. For the cases of our example, we will use a local repository instead of a remote repository.
Continuous integration systems need not run on a fixed, regular schedule. You can also have them run every few commits, or per-commit. For our example case, the CI system will run periodically. This means if it is set up to check for changes in five-second periods, it will run tests against the most recent commit made after the five-second period. It won’t test every commit made within that period of time, only the most recent one.
This CI system is designed to check periodically for changes in a repository. In real-world CI systems, you can also have the repository observer get notified by a hosted repository. Github, for example, provides “post-commit hooks” which send out notifications to a URL. Following this model, the repository observer would be called by the web server hosted at that URL to respond to that notification. Since this is complex to model locally, we’re using an observer model, where the repository observer will check for changes instead of being notified.
CI systems also have a reporter aspect, where the test runner reports its results to a component that makes them available for people to see, perhaps on a webpage. For simplicity, this project gathers the test results and stores them as files in the file system local to the dispatcher process.
Note that the architecture this CI system uses is just one possibility among many. This approach has been chosen to simplify our case study into three main components.
注意事项和说明:
这个项目使用git管理需要测试的代码。此项目只可以使用标准的代码管理程序,如果你不熟悉git,但是熟练其他代码管理工具,比如svn、 Mercurial也是可以使用本项目的。
由于代码长度和单元测试的限制,我简化了测试代码,我们只需要运行代码库中的tests
文件。
持续集成系统通常在远程web 服务器控制master代码库。在这个例子,我们使用本地代码库代替远程代码库。
持续集成系统不需要运行在fixd分支。所以你可以一次提交运行一次,也可以几次提交运行一次。在这个例子中,定期时间会被触发。也就是说,如果设定5分钟检查一次,那么每隔5秒系统就会对5秒内最近的一次提交进行测试。不论这5秒内发生了多少次提交,系统只会对最后一次提交的结果进行一次测试。
持续集成系统是为了检查源码的变化。在实际使用的CI 系统,你可以通过代码库的通知来获取代码更新消息。比如GitHub,提供"post-commit hooks"发送URL提示消息。这种模式下,web服务器监听代码改动URL的消息。由于本地使用会过于复杂,所以我们使用监听模式,代码监听将会检查代码变化而不是等待代码库发送消息。
持续集成系统也需要拥有报告形式,运行测试结果报告也可以以一种可以看见的形式比如网页,呈现给用户。这样,项目组的其他成员也可以看到报告结果。
注意,我们的持续集成系统只是众多中的一种。在这个项目中,将项目简化成3个主要的部分。
Introduction
The basic structure of a continuous integration system consists of three components: an observer, a test job dispatcher, and a test runner. The observer watches the repository. When it notices that a commit has been made, it notifies the job dispatcher. The job dispatcher then finds a test runner and gives it the commit number to test.
There are many ways to architect a CI system. We could have the observer, dispatcher and runner be the same process on a single machine. This approach is very limited since there is no load handling, so if more changes are added to the repository than the CI system can handle, a large backlog will accrue. This approach is also not fault-tolerant at all; if the computer it is running on fails or there is a power outage, there are no fallback systems, so no tests will run. The ideal system would be one that can handle as many test jobs as requested, and will do its best to compensate when machines go down.
To build a CI system that is fault-tolerant and load-bearing, in this project, each of these components is its own process. This will let each process be independent of the others, and let us run multiple instances of each process. This is useful when you have more than one test job that needs to be run at the same time. We can then spawn multiple test runners in parallel, allowing us to run as many jobs as needed, and prevent us from accumulating a backlog of queued tests.
In this project, not only do these components run as separate processes, but they also communicate via sockets, which will let us run each process on a separate, networked machine. A unique host/port address is assigned to each component, and each process can communicate with the others by posting messages at the assigned addresses.
This design will let us handle hardware failures on the fly by enabling a distributed architecture. We can have the observer run on one machine, the test job dispatcher on another, and the test runners on another, and they can all communicate with each other over a network. If any of these machines go down, we can schedule a new machine to go up on the network, so the system becomes fail-safe.
This project does not include auto-recovery code, as that is dependent on your distributed system’s architecture, but in the real world, CI systems are run in a distributed environment like this so they can have failover redundancy (i.e., we can fall back to a standby machine if one of the machines a process was running on becomes defunct).
For the purposes of this project, each of these processes will be locally and manually started distinct local ports.
最基础的持续集成系统包括3部分:监听、测试调度和测试执行。监听源码,当源码改动后,发送消息给调度,调度发送给测试执行者需要测试的数字,之后开始测试。
有很多方式设计CI系统,我们可以使用监听者、调度者和测试执行者在同一进程中和同一机器中,这样一来系统
Files in this Project
This project contains Python files for each of these components: the repository
observer \newline (repo_observer.py
), the test job dispatcher
(dispatcher.py
), and the test runner \newline (test_runner.py
). Each of
these three processes communicate with each other using sockets, and since the
code used to transmit information is shared by all of them, there is a
helpers.py file that contains it, so each process imports the communicate
function from here instead of having it duplicated in the file.
There are also bash script files used by these processes. These script files are used to execute bash and git commands in an easier way than constantly using Python’s operating system-level modules like os and subprocess.
Lastly, there is a tests directory, which contains two example tests the CI system will run. One test will pass, and the other will fail.
Initial Setup
While this CI system is ready to work in a distributed system, let us start by running everything locally on one computer so we can get a grasp on how the CI system works without adding the risk of running into network-related issues. If you wish to run this in a distributed environment, you can run each component on its own machine.
Continuous integration systems run tests by detecting changes in a code repository, so to start, we will need to set up the repository our CI system will monitor.
Let’s call this test_repo
:
$ mkdir test_repo
$ cd test_repo
$ git init
This will be our master repository. This is where developers check in their code, so our CI should pull this repository and check for commits, then run tests. The thing that checks for new commits is the repository observer.
The repository observer works by checking commits, so we need at least one commit in the master repository. Let’s commit our example tests so we have some tests to run.
Copy the tests folder from this code base to test_repo
and commit it:
$ cp -r /this/directory/tests /path/to/test_repo/
$ cd /path/to/test\_repo
$ git add tests/
$ git commit -m ”add tests”
Now you have a commit in the master repository.
The repo observer component will need its own clone of the code, so it
can detect when a new commit is made. Let’s create a clone of our master
repository, and call it test_repo_clone_obs
:
$ git clone /path/to/test_repo test_repo_clone_obs
The test runner will also need its own clone of the code, so it can
checkout the repository at a given commit and run the tests. Let’s
create another clone of our master repository, and call it
test_repo_clone_runner
:
$ git clone /path/to/test_repo test_repo_clone_runner
The Components
The Repository Observer (repo_observer.py
)
The repository observer monitors a repository and notifies the dispatcher when a new commit is seen. In order to work with all version control systems (since not all VCSs have built-in notification systems), this repository observer is written to periodically check the repository for new commits instead of relying on the VCS to notify it that changes have been made.
The observer will poll the repository periodically, and when a change is seen, it will tell the dispatcher the newest commit ID to run tests against. The observer checks for new commits by finding the current commit ID in its repository, then updates the repository, and lastly, it finds the latest commit ID and compares them. For the purposes of this example, the observer will only dispatch tests against the latest commit. This means that if two commits are made between a periodic check, the observer will only run tests against the latest commit. Usually, a CI system will detect all commits since the last tested commit, and will dispatch test runners for each new commit, but I have modified this assumption for simplicity.
The observer must know which repository to observe. We previously
created a clone of our repository at /path/to/test_repo_clone_obs
.
The observer will use this clone to detect changes. To allow the
repository observer to use this clone, we pass it the path when we
invoke the repo_observer.py
file. The repository observer will use this
clone to pull from the main repository.
We must also give the observer the dispatcher’s address, so the observer
may send it messages. When you start the repository observer, you can
pass in the dispatcher’s server address using the --dispatcher-server
command line argument. If you do not pass it in, it will assume the
default address of localhost:8888
.
def poll():
parser = argparse.ArgumentParser()
parser.add_argument("--dispatcher-server",
help="dispatcher host:port, " \
"by default it uses localhost:8888",
default="localhost:8888",
action="store")
parser.add_argument("repo", metavar="REPO", type=str,
help="path to the repository this will observe")
args = parser.parse_args()
dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")
Once the repository observer file is invoked, it starts the poll()
function. This function parses the command line arguments, and then
kicks off an infinite while loop. The while loop is used to periodically
check the repository for changes. The first thing it does is call the
update_repo.sh
Bash script 1.
while True:
try:
# call the bash script that will update the repo and check
# for changes. If there's a change, it will drop a .commit_id file
# with the latest commit in the current working directory
subprocess.check_output(["./update_repo.sh", args.repo])
except subprocess.CalledProcessError as e:
raise Exception("Could not update and check repository. " +
"Reason: %s" % e.output)
The update_repo.sh
file is used to identify any new commits and let the
repository observer know. It does this by noting what commit ID we are
currently aware of, then pulls the repository, and checks the latest
commit ID. If they match, no changes are made, so the repository
observer doesn’t need to do anything, but if there is a difference in
the commit ID, then we know a new commit has been made. In this case,
update_repo.sh
will create a file called .commit_id
with the latest
commit ID stored in it.
A step-by-step breakdown of update_repo.sh
is as follows. First, the
script sources the run_or_fail.sh
file, which provides the
run_or_fail
helper method used by all our shell scripts. This method
is used to run the given command, or fail with the given error message.
#!/bin/bash
source run_or_fail.sh
Next, the script tries to remove a file named .commit_id
. Since
updaterepo.sh
is called infinitely by the repo_observer.py
file, if we
previously had a new commit, then .commit_id
was created, but holds a
commit we already tested. Therefore, we want to remove that file, and
create a new one only if a new commit is found.
bash rm -f .commit_id
After it removes the file (if it existed), it verifies that the repository we are observing exists, and then resets it to the most recent commit, in case anything caused it to get out of sync.
run_or_fail "Repository folder not found!" pushd $1 1> /dev/null
run_or_fail "Could not reset git" git reset --hard HEAD
It then calls git log and parses the output, looking for the most recent commit ID.
COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
echo "Could not call 'git log' on repository"
exit 1
fi
COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`
Then it pulls the repository, getting any recent changes, then gets the most recent commit ID.
run_or_fail "Could not pull from repository" git pull
COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
echo "Could not call 'git log' on repository"
exit 1
fi
NEW_COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`
Lastly, if the commit ID doesn’t match the previous ID, then we know we have new commits to check, so the script stores the latest commit ID in a .commit_id file.
# if the id changed, then write it to a file
if [ $NEW_COMMIT_ID != $COMMIT_ID ]; then
popd 1> /dev/null
echo $NEW_COMMIT_ID > .commit_id
fi
When update_repo.sh
finishes running in repo_observer.py
, the
repository observer checks for the existence of the .commit_id
file. If
the file does exist, then we know we have a new commit, and we need to
notify the dispatcher so it can kick off the tests. The repository
observer will check the dispatcher server’s status by connecting to it
and sending a ‘status’ request, to make sure there are no problems with
it, and to make sure it is ready for instruction.
if os.path.isfile(".commit_id"):
try:
response = helpers.communicate(dispatcher_host,
int(dispatcher_port),
"status")
except socket.error as e:
raise Exception("Could not communicate with dispatcher server: %s" % e)
If it responds with “OK”, then the repository observer opens the
.commit_id
file, reads the latest commit ID and sends that ID to the
dispatcher, using a dispatch:<commit ID>
request. It will then sleep
for five seconds and repeat the process. We’ll also try again in five
seconds if anything went wrong along the way.
if response == "OK":
commit = ""
with open(".commit_id", "r") as f:
commit = f.readline()
response = helpers.communicate(dispatcher_host,
int(dispatcher_port),
"dispatch:%s" % commit)
if response != "OK":
raise Exception("Could not dispatch the test: %s" %
response)
print "dispatched!"
else:
raise Exception("Could not dispatch the test: %s" %
response)
time.sleep(5)
The repository observer will repeat this process forever, until you kill
the process via a \newline KeyboardInterrupt
(Ctrl+c), or by sending it a kill
signal.
The Dispatcher (dispatcher.py
)
The dispatcher is a separate service used to delegate testing tasks. It listens on a port for requests from test runners and from the repository observer. It allows test runners to register themselves, and when given a commit ID from the repository observer, it will dispatch a test runner against the new commit. It also gracefully handles any problems with the test runners and will redistribute the commit ID to a new test runner if anything goes wrong.
When dispatch.py
is executed, the serve
function is called. First it
parses the arguments that allow you to specify the dispatcher’s host and
port:
def serve():
parser = argparse.ArgumentParser()
parser.add_argument("--host",
help="dispatcher's host, by default it uses localhost",
default="localhost",
action="store")
parser.add_argument("--port",
help="dispatcher's port, by default it uses 8888",
default=8888,
action="store")
args = parser.parse_args()
This starts the dispatcher server, and two other threads. One thread
runs the runner_checker
function, and other runs the redistribute
function.
server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
print `serving on %s:%s` % (args.host, int(args.port))
...
runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
redistributor = threading.Thread(target=redistribute, args=(server,))
try:
runner_heartbeat.start()
redistributor.start()
# Activate the server; this will keep running until you
# interrupt the program with Ctrl+C or Cmd+C
server.serve_forever()
except (KeyboardInterrupt, Exception):
# if any exception occurs, kill the thread
server.dead = True
runner_heartbeat.join()
redistributor.join()
The runner_checker
function periodically pings each registered test
runner to make sure they are still responsive. If they become
unresponsive, then that runner will be removed from the pool and its
commit ID will be dispatched to the next available runner. The function
will log the commit ID in the pending_commits
variable.
def runner_checker(server):
def manage_commit_lists(runner):
for commit, assigned_runner in server.dispatched_commits.iteritems():
if assigned_runner == runner:
del server.dispatched_commits[commit]
server.pending_commits.append(commit)
break
server.runners.remove(runner)
while not server.dead:
time.sleep(1)
for runner in server.runners:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
response = helpers.communicate(runner["host"],
int(runner["port"]),
"ping")
if response != "pong":
print "removing runner %s" % runner
manage_commit_lists(runner)
except socket.error as e:
manage_commit_lists(runner)
The redistribute
function is used to dispatch the commit IDs logged in
pending_commits
. When redistribute
runs, it checks if there are any
commit IDs in pending_commits
. If so, it calls the dispatch_tests
function with the commit ID.
def redistribute(server):
while not server.dead:
for commit in server.pending_commits:
print "running redistribute"
print server.pending_commits
dispatch_tests(server, commit)
time.sleep(5)
The dispatch_tests
function is used to find an available test runner
from the pool of registered runners. If one is available, it will send a
runtest message to it with the commit ID. If none are currently
available, it will wait two seconds and repeat this process. Once
dispatched, it logs which commit ID is being tested by which test runner
in the dispatched_commits
variable. If the commit ID is in the
pending_commits
variable, dispatch_tests
will remove it since it has
already been successfully re-dispatched.
def dispatch_tests(server, commit_id):
# NOTE: usually we don't run this forever
while True:
print "trying to dispatch to runners"
for runner in server.runners:
response = helpers.communicate(runner["host"],
int(runner["port"]),
"runtest:%s" % commit_id)
if response == "OK":
print "adding id %s" % commit_id
server.dispatched_commits[commit_id] = runner
if commit_id in server.pending_commits:
server.pending_commits.remove(commit_id)
return
time.sleep(2)
The dispatcher server uses the SocketServer
module, which is a very simple
server that is part of the standard library. There are four basic server types
in the SocketServer
module: TCP
, UDP
, UnixStreamServer
and
UnixDatagramServer
. We will be using a TCP-based socket server so we can
ensure continuous, ordered streams of data between servers, as UDP does not
ensure this.
The default TCPServer
provided by SocketServer
can only handle one
request at a time, so it cannot handle the case where the dispatcher is
talking to one connection, say from a test runner, and then a new
connection comes in, say from the repository observer. If this happens,
the repository observer would have to wait for the first connection to
complete and disconnect before it would be serviced. This is not ideal
for our case, since the dispatcher server must be able to directly and
swiftly communicate with all test runners and the repository observer.
In order for the dispatcher server to handle simultaneous connections,
it uses the \newline ThreadingTCPServer
custom class, which adds threading
ability to the default SocketServer
. This means that any time the
dispatcher receives a connection request, it spins off a new process
just for that connection. This allows the dispatcher to handle multiple
requests at the same time.
class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
runners = [] # Keeps track of test runner pool
dead = False # Indicate to other threads that we are no longer running
dispatched_commits = {} # Keeps track of commits we dispatched
pending_commits = [] # Keeps track of commits we have yet to dispatch
The dispatcher server works by defining handlers for each request. This
is defined by the DispatcherHandler
class, which inherits from
SocketServer
’s BaseRequestHandler
. This base class just needs us to
define the handle function, which will be invoked whenever a connection
is requested. The handle function defined in DispatcherHandler
is our
custom handler, and it will be called on each connection. It looks at
the incoming connection request (self.request
holds the request
information), and parses out what command is being requested of it.
class DispatcherHandler(SocketServer.BaseRequestHandler):
"""
The RequestHandler class for our dispatcher.
This will dispatch test runners against the incoming commit
and handle their requests and test results
"""
command_re = re.compile(r"(\w+)(:.+)*")
BUF_SIZE = 1024
def handle(self):
self.data = self.request.recv(self.BUF_SIZE).strip()
command_groups = self.command_re.match(self.data)
if not command_groups:
self.request.sendall("Invalid command")
return
command = command_groups.group(1)
It handles four commands: status
, register
, dispatch
, and results
.
status
is used to check if the dispatcher server is up and running.
if command == "status":
print "in status"
self.request.sendall("OK")
In order for the dispatcher to do anything useful, it needs to have at
least one test runner registered. When register is called on a host:port
pair, it stores the runner’s information in a list (the runners object
attached to the ThreadingTCPServer
object) so it can communicate with
the runner later, when it needs to give it a commit ID to run tests
against.
elif command == "register":
# Add this test runner to our pool
print "register"
address = command_groups.group(2)
host, port = re.findall(r":(\w*)", address)
runner = {"host": host, "port":port}
self.server.runners.append(runner)
self.request.sendall("OK")
dispatch
is used by the repository observer to dispatch a test runner
against a commit. The format of this command is dispatch:<commit ID>
.
The dispatcher parses out the commit ID from this message and sends it
to the test runner.
elif command == "dispatch":
print "going to dispatch"
commit_id = command_groups.group(2)[1:]
if not self.server.runners:
self.request.sendall("No runners are registered")
else:
# The coordinator can trust us to dispatch the test
self.request.sendall("OK")
dispatch_tests(self.server, commit_id)
results
is used by a test runner to report the results of a finished
test run. The format of this command is
results:<commit ID>:<length of results data in bytes>:<results>
.
The <commit ID>
is used to identify which commit ID the tests were run
against. The <length of results data in bytes>
is used to figure out
how big a buffer is needed for the results data. Lastly, <results>
holds the actual result output.
elif command == "results":
print "got test results"
results = command_groups.group(2)[1:]
results = results.split(":")
commit_id = results[0]
length_msg = int(results[1])
# 3 is the number of ":" in the sent command
remaining_buffer = self.BUF_SIZE - \
(len(command) + len(commit_id) + len(results[1]) + 3)
if length_msg > remaining_buffer:
self.data += self.request.recv(length_msg - remaining_buffer).strip()
del self.server.dispatched_commits[commit_id]
if not os.path.exists("test_results"):
os.makedirs("test_results")
with open("test_results/%s" % commit_id, "w") as f:
data = self.data.split(":")[3:]
data = "\n".join(data)
f.write(data)
self.request.sendall("OK")
The Test Runner (test_runner.py
)
The test runner is responsible for running tests against a given commit ID and reporting the results. It communicates only with the dispatcher server, which is responsible for giving it the commit IDs to run against, and which will receive the test results.
When the test_runner.py
file is invoked, it calls the serve
function
which starts the test runner server, and also starts a thread to run the
dispatcher_checker
function. Since this startup process is very similar
to the ones described in repo_observer.py
and dispatcher.py
, we omit
the description here.
The dispatcher_checker
function pings the dispatcher server every five
seconds to make sure it is still up and running. This is important for
resource management. If the dispatcher goes down, then the test runner
will shut down since it won’t be able to do any meaningful work if there
is no dispatcher to give it work or to report to.
def dispatcher_checker(server):
while not server.dead:
time.sleep(5)
if (time.time() - server.last_communication) > 10:
try:
response = helpers.communicate(
server.dispatcher_server["host"],
int(server.dispatcher_server["port"]),
"status")
if response != "OK":
print "Dispatcher is no longer functional"
server.shutdown()
return
except socket.error as e:
print "Can't communicate with dispatcher: %s" % e
server.shutdown()
return
The test runner is a ThreadingTCPServer
, like the dispatcher server. It
requires threading because not only will the dispatcher be giving it a
commit ID to run, but the dispatcher will be pinging the runner
periodically to verify that it is still up while it is running tests.
class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
dispatcher_server = None # Holds the dispatcher server host/port information
last_communication = None # Keeps track of last communication from dispatcher
busy = False # Status flag
dead = False # Status flag
The communication flow starts with the dispatcher requesting that the runner accept a commit ID to run. If the test runner is ready to run the job, it responds with an acknowledgement to the dispatcher server, which then closes the connection. In order for the test runner server to both run tests and accept more requests from the dispatcher, it starts the requested test job on a new thread.
This means that when the dispatcher server makes a request (a ping, in this case) and expects a response, it will be done on a separate thread, while the test runner is busy running tests on its own thread. This allows the test runner server to handle multiple tasks simultaneously. Instead of this threaded design, it is possible to have the dispatcher server hold onto a connection with each test runner, but this would increase the dispatcher server’s memory needs, and is vulnerable to network problems, like accidentally dropped connections.
The test runner server responds to two messages from the dispatcher. The first
is ping
, which is used by the dispatcher server to verify that the runner is
still active.
class TestHandler(SocketServer.BaseRequestHandler):
...
def handle(self):
....
if command == "ping":
print "pinged"
self.server.last_communication = time.time()
self.request.sendall("pong")
The second is runtest
, which accepts messages of the form runtest:<commit ID>
, and is used to kick off tests on the given commit. When runtest is
called, the test runner will check to see if it is already running a test, and
if so, it will return a BUSY
response to the dispatcher. If it is available,
it will respond to the server with an OK
message, set its status as busy and
run its run_tests
function.
elif command == "runtest":
print "got runtest command: am I busy? %s" % self.server.busy
if self.server.busy:
self.request.sendall("BUSY")
else:
self.request.sendall("OK")
print "running"
commit_id = command_groups.group(2)[1:]
self.server.busy = True
self.run_tests(commit_id,
self.server.repo_folder)
self.server.busy = False
This function calls the shell script test_runner_script.sh
, which
updates the repository to the given commit ID. Once the script returns,
if it was successful at updating the repository we run the tests using
unittest and gather the results in a file. When the tests have finished
running, the test runner reads in the results file and sends it in a
results message to the dispatcher.
def run_tests(self, commit_id, repo_folder):
# update repo
output = subprocess.check_output(["./test_runner_script.sh",
repo_folder, commit_id])
print output
# run the tests
test_folder = os.path.join(repo_folder, "tests")
suite = unittest.TestLoader().discover(test_folder)
result_file = open("results", "w")
unittest.TextTestRunner(result_file).run(suite)
result_file.close()
result_file = open("results", "r")
# give the dispatcher the results
output = result_file.read()
helpers.communicate(self.server.dispatcher_server["host"],
int(self.server.dispatcher_server["port"]),
"results:%s:%s:%s" % (commit_id, len(output), output))
Here’s test_runner_script.sh
:
#!/bin/bash
REPO=$1
COMMIT=$2
source run_or_fail.sh
run_or_fail "Repository folder not found" pushd "$REPO" 1> /dev/null
run_or_fail "Could not clean repository" git clean -d -f -x
run_or_fail "Could not call git pull" git pull
run_or_fail "Could not update to given commit hash" git reset --hard "$COMMIT"
In order to run test_runner.py
, you must point it to a clone of the
repository to run tests against. In this case, you can use the previously
created /path/to/test_repo test_repo_clone_runner
clone as the argument. By
default, test_runner.py
will start its own server on localhost using a port
in the range 8900-9000, and will try to connect to the dispatcher server at
localhost:8888
. You may pass it optional arguments to change these values.
The --host
and --port
arguments are used to designate a specific address to
run the test runner server on, and the --dispatcher-server
argument specifies
the address of the dispatcher.
Control Flow Diagram
\aosafigref{500l.ci.controlflow} is an overview diagram of this system. This
diagram assumes that all three files \newline (repo_observer.py
,
dispatcher.py
and test_runner.py
) are already running, and describes the
actions each process takes when a new commit is made.
\aosafigure[360pt]{ci-images/diagram.png}{Control Flow}{500l.ci.controlflow}
Running the Code
We can run this simple CI system locally, using three different terminal shells for each process. We start the dispatcher first, running on port 8888:
$ python dispatcher.py
In a new shell, we start the test runner (so it can register itself with the dispatcher):
$ python test_runner.py <path/to/test_repo_clone_runner>
The test runner will assign itself its own port, in the range 8900-9000. You may run as many test runners as you like.
Lastly, in another new shell, let’s start the repo observer:
$ python repo_observer.py --dispatcher-server=localhost:8888 <path/to/repo_clone_obs>
Now that everything is set up, let’s trigger some tests! To do that, we’ll need to make a new commit. Go to your master repository and make an arbitrary change:
$ cd /path/to/test_repo
$ touch new_file
$ git add new_file
$ git commit -m"new file" new_file
Then repo_observer.py
will realize that there’s a new commit and notify
the dispatcher. You can see the output in their respective shells, so
you can monitor them. Once the dispatcher receives the test results, it
stores them in a test_results/
folder in this code base, using the
commit ID as the filename.
Error Handling
This CI system includes some simple error handling.
If you kill the test_runner.py
process, dispatcher.py
will figure out
that the runner is no longer available and will remove it from the pool.
You can also kill the test runner, to simulate a machine crash or network failure. If you do so, the dispatcher will realize the runner went down and will give another test runner the job if one is available in the pool, or will wait for a new test runner to register itself in the pool.
If you kill the dispatcher, the repository observer will figure out it went down and will throw an exception. The test runners will also notice, and shut down.
Conclusion
By separating concerns into their own processes, we were able to build the fundamentals of a distributed continuous integration system. With processes communicating with each other via socket requests, we are able to distribute the system across multiple machines, helping to make our system more reliable and scalable.
Since the CI system is quite simple now, you can extend it yourself to be far more functional. Here are a few suggestions for improvements:
Per-Commit Test Runs
The current system will periodically check to see if new commits are run and will run the most recent commit. This should be improved to test each commit. To do this, you can modify the periodic checker to dispatch test runs for each commit in the log between the last-tested and the latest commit.
Smarter Test Runners
If the test runner detects that the dispatcher is unresponsive, it stops running. This happens even when the test runner is in the middle of running tests! It would be better if the test runner waited for a period of time (or indefinitely, if you do not care about resource management) for the dispatcher to come back online. In this case, if the dispatcher goes down while the test runner is actively running a test, instead of shutting down it will complete the test and wait for the dispatcher to come back online, and will report the results to it. This will ensure that we don’t waste any effort the test runner makes, and that we will only run tests once per commit.
Real Reporting
In a real CI system, you would have the test results report to a reporter service which would gather the results, post them somewhere for people to review, and notify a list of interested parties when a failure or other notable event occurs. You can extend our simple CI system by creating a new process to get the reported results, instead of the dispatcher gathering the results. This new process could be a web server (or can connect to a web server) which could post the results online, and may use a mail server to alert subscribers to any test failures.
Test Runner Manager
Right now, you have to manually launch the test_runner.py
file to start
a test runner. Instead, you could create a test runner manager process
which would assess the current load of test requests from the dispatcher
and scale the number of active test runners accordingly. This process
will receive the runtest messages and will start a test runner process
for each request, and will kill unused processes when the load
decreases.
Using these suggestions, you can make this simple CI system more robust and fault-tolerant, and you can integrate it with other systems, like a web-based test reporter.
If you wish to see the level of flexibility continuous integration systems can achieve, I recommend looking into Jenkins, a very robust, open-source CI system written in Java. It provides you with a basic CI system which you can extend using plugins. You may also access its source code through GitHub. Another recommended project is Travis CI, which is written in Ruby and whose source code is also available through GitHub.
This has been an exercise in understanding how CI systems work, and how to build one yourself. You should now have a more solid understanding of what is needed to make a reliable distributed system, and you can now use this knowledge to develop more complex solutions.
Bash is used because we need to check file existence, create files, and use Git, and a shell script is the most direct and easy way to achieve this. Alternatively, there are cross-platform Python packages you can use; for example, Python’s
os
built-in module can be used for accessing the file system, and GitPython can be used for Git access, but they perform actions in a more roundabout way. ↩︎