Hi, I'm Jonas Galvez, an engineer at STORED.

I'm Jonas Galvez, a JavaScript and Go engineer at STORED e-commerce.

I'm an eternal student of distributed systems and practitioner of minimalism.

I build and deploy my stuff with Vue, Nuxt, Go and Kubernetes nowadays. This is the scope of the articles published here.

I am driven by purpose and cultivate a stoic attitude towards life.

Scalable API Testing

Black-box testing is the technical term for ensuring functionality with zero knowledge of the application's internals. Load testing too can be said to be a form of black-box testing when no system metric other than speed and availability are verifiable.

I was tasked with the job of creating a scalable black-box test suite for a HTTP API, capable of examining its speed and stability as well as its expected behaviour. I set out to accomplish these two forms of testing using state of the art Python libraries and deployment tools.

Harvesting logs to infer common signatures

The API I was testing was already live and actively used by thousands of customers which employed nearly all of its capacities. So instead of manually writing test cases from the API's specification, I decided to dynamically build one from a series of HTTP log files I was provided with. In total, I had access to 20+ log files ranging between 200MB to 300MB each, compressed.

My first attempts at parsing them made it clear they needed some serious sanitization. After experimenting with a variety of URL sanitization approaches, I set out to find a concise way to describe the endpoints I needed to test, and use such descriptions to parse valid API call signatures from the logs. Using an hypothetical API to list foods as an example, this is what it came to look like:

fruits /api/fruits apikey! sort items page /api/fruits/<search> apikey! sort items page vegetables /api/vegetables apikey! sort items page /api/vegetables/<search> apikey! sort items page

This is of course only illustrative as the actual API had dozens endpoints and potential parameter combinations that needed to be tested. I suggestively named this file ENDPOINTS and placed it at the root of my test suite. In the interest of minimalism, I used single space indentation for structuring each endpoint description. The first line specifies the filename where sanitized URLs will be collected. The second line specifies the URI and URI parameters and the third line, query string parameters. If a parameter is required, it's suffixed with an exclamation mark. With this simple specification file, I then used werkzeug's routing module to create a parser:

import re from werkzeug.routing import Map, Rule def endpoints(host): map = Map() current_endpoint = None with open('ENDPOINTS') as endpoints: for line in endpoints: line = line.rstrip() if re.search('^[^\s]+', line): endpoint_group = line.strip() index = 1 elif re.search('^\s+/', line): current_endpoint = Rule( line.strip(), endpoint = '%s-%s' % (endpoint_group, index) ) index += 1 elif re.search('^\s+', line): current_endpoint.params = re.split('\s+', line) map.add(current_endpoint) return map.bind(host)

Parsing of URI parameters is already handled by werkzeug, and since they're part of the endpoint itself they're implicitly required. I store query string parameters for verification later as params in each Rule object (a custom addition). Moving on to the actual log parser, my main goals were: a) discarding malformed calls and removing junk from otherwise valid requests (sanitization) and b) avoiding repetitive API calls in the test suite.

The parsing relies heavily on MapAdapter, which raises HTTP error exceptions and returns parsed URI parameters. Using the previously stored params list from each Rule object, I then ensure required parameters are present and remove all invaliad parameters (missing from each definition in ENDPOINTS) at the end. Also worth noting tqdm provides a progress bar while reading the files.

with open(logfile, 'rb') as logfile_handler: size = os.path.getsize(logfile) with tqdm(total=size) as pbar: newpos = None oldpos = logfile_handler.tell() for line in gzip.GzipFile(fileobj=logfile_handler): newpos = logfile_handler.tell() if newpos != oldpos: pbar.update(newpos if oldpos == 0 else newpos-oldpos) oldpos = newpos request_data = re.findall('"([A-Z]+)\s+(.*?)\s+HTTP/1.1', line) if len(request_data): uri = request_data[0][1] try: parsed_uri = urlparse.urlparse(uri) rule, args = endpoints.match(parsed_uri.path, return_rule=True) except NotFound, RequestRedirect: continue if random() > 0.0009: continue parsed_query = cgi.parse_qs(parsed_uri.query) if getattr(rule, 'params', False): required_params = [ param[:-1] for param in rule.params if param.endswith('!') ] optional_params = [ param for param in rule.params if not param.endswith('!') ] else: required_params, optional_params = [], [] all_parameters = set(required_params+optional_params) for param in set(parsed_query.keys()).difference(all_parameters): del parsed_query[param]

Since each file had millions of lines, in order to generate a manageably sized data set I gradually lowered the exclusion rate until it got to about 200 results from each file. But there were still a lot of duplicates. I figured I didn't need to test the same API call signature (combination of parameters used) more than a few times. So I used Python's native hash function to generate a unique signature built from the Rule definition and a hash of all parameters passed to each call. Picking up from the previous snippet:

keyset = frozenset(parsed_query.keys()) signature = '%s%s' % (rule.rule, hash(keyset)) if signatures.get(signature): if len(keyset) == 0 and signatures[signature] > 0: continue if signatures[signature] > 5: continue signatures[signature] += 1 else: signatures[signature] = 1

If the parser sees an API call and the number of times it's been collected doesn't exceed the limit, it's then reassembled with the sanitized parameters and written to disk under a directory named after the log file, and the filename spcified in the ENDPOINTS file.

cleaned_query = { key: parsed_query[key][0] for key in parsed_query.keys() } sample_filename = os.path.join(samplesdir, '%s.sample' % rule.endpoint) cleaned_uri = '%s?%s' % ( parsed_uri.path, urllib.urlencode(cleaned_query) ) with open(sample_filename, 'a+') as sample: sample.write('%s\n' % cleaned_uri) raw_input("Press any key to exit.")

To run multiple instances of it in parallel I just used screen. Starting too many instances at once seemed to cause a few processes to crash (this was a virtualised development server), so I divided it in batches (in ${A[@]:x:y}, x is the starting index and y is the number of items to slice):

logfiles=(~/logs/*.gz) for file in ${logfiles[@]:0:5}; do screen -dmS $(basename $file) fab harvest:$file done

Running screen with -dMS sets a custom title for each session (in this case, the log filename) and detaches it from the terminal before running. To monitor I would attach any of the running sessions (e.g., screen -r 24124), which would show me the progress bar and an exit message if done:

100%|█████████████████████████| 309156334/309156334 [48:12<00:00, 106864.79it/s] Press any key to exit.

The next and final step was to map and reduce the results. First, concatenate results from all directories into mapped/, then sort -u all of them into reduced/. Doesn't make this step of the process scalable, but definitely good enough for harvesting a couple dozen log files.

mkdir -p mapped reduced cat */fruits.sample > mapped/fruits.sample cat */vegetables.sample > mapped/vegetables.sample sort -u mapped/fruits.sample > reduced/fruits.sample sort -u mapped/vegetables.sample > reduced/vegetables.sample

Saving responses from Load Test

Mark Nottingham has a seminal starting point on the subject, bringing attention to the necessity of verifying the test server's bandwidth and making sure you don't get too close to the limit, among many other sensible recommendations. Mark's post led me to autobench and many others tools, such as siege, loads, funkload, blitz (SaaS), gatling and tsung.

None of them came close to the elegance and simplicity of Locust though, which aside from being written in modern Python, has a clear and concise API and built-in support for running a distributed cluster.

To use it you need to write a locustfile, which is just a Python file with at least one Locust subclass. Each Locust object specifies a list of tasks to be performed during the test, represented by a TaskSet class.

Although you can use TaskSet objects to build a test suite, I believe it falls short in comparison to the myriad of benefits of pytest. I chose to let Locust only do the load test and save responses from the API, for further consumption by a pytest suite subsequently. The file system would have been fine but since I intended to run Locust distributedly, I used Redis as storage for the responses. TaskSet methods are dynamically generated from the reduced list of URIs.

import os import redis import glob import functools from locust import HttpLocust, TaskSet BASE = os.path.abspath(os.path.dirname(__file__)) def _save_response(uri, tset): response = tset.client.get(uri) tset.redis.hset('responses', uri, response.content) class ResponseSaver(TaskSet): def __init__(self, parent): super(ResponseSaver, self).__init__(parent) self.tasks = list(self.gen_tasks()) self.redis = redis.Redis() def gen_tasks(self): samples = glob.glob('%s/samples/reduced/*.sample' % BASE) for sample in samples: with open(sample) as handler: for line in handler: yield functools.partial(_save_response, line.strip()) class APIUser(HttpLocust): task_set = ResponseSaver host = "http://api.host"

Locust is currently based on gevent, but I wouldn't be surprised if they upgrade it to aiohttp which makes awesome use of PEP 3156 and PEP 492.

Running test suite against responses

Now that we got past load testing and got our pretty graphs from Locust, we can proceed to writing some actual unit tests. The primary goal was to test data integrity, i.e., validate schemas of all JSON responses. That's where marshmallow comes in. It's a ORM/ODM/framework-agnostic library that allows you to cleanly define a schema and validate a JSON document against it. It's better than manually validating each response, but I still found it too verbose to maintain. I wanted something simple and concise as the ENDPOINTS file. So I came up with another small DSL, a thin layer on top of marshmallow:

success:bool data:{name:str link:url vitamins,minerals:str[]} /api/fruits /api/fruits/<search> /api/vegetables /api/vegetables/<search>

I called this SCHEMAS. By now it's clear I have a knack for DSLs. The syntax is fairly straightforward: field:type, where field can be a comma-separated list, and type can be followed by [] to indicate an array and ? to indicate it's optional (since optional parameters were more common in ENDPOINTS, it used ! instead, to indicate which fields were required). To specify a nested schema within each response (for instance, an array of objects under data), you can enclose its fields with {} where a single type would normally be defined.

Like in ENDPOINTS, single space indentation for listing endpoints to validate schemas against. To match URIs, again I used werkzeug.routing.

Following next is validation.py, where schemas() is responsible for parsing the SCHEMAS file and returning a validation function that will know what schema to use according to the URL it gets. The validation function takes uri and response as parameters, both strings.

def schemas(host='api.host'): map = Map() with open('%s/SCHEMAS' % BASE) as schemas: for line in schemas: line = line.rstrip() if re.search('^[^\s]+', line): schema = _gen_schema(line) elif re.search('^\s+', line): endpoint = re.sub('\W+', '-', line) rule = Rule(line.strip(), endpoint=endpoint) rule.schema = schema map.add(rule) _validate.matcher = map.bind(host) return _validate def _validate(uri, response): try: rule, args = _validate.matcher.match(uri, return_rule=True) data, errors = rule.schema().loads(response) return errors except NotFound, RequestRedirect: return None def _gen_schema(spec): fields = [] for nested in re.findall('([\w,]+):{(.+)}(\??)', spec.strip()): for field in nested[0].split(','): fields.append([field, _gen_schema._handle_nested(nested[1]), not len(nested[2])]) {% raw %}spec = spec.replace('%s:{%s}' % nested[:2], ''){% endraw %} spec = spec.replace(' ', ' ') fields += _gen_schema._handle_nested(spec) for index, value in enumerate(fields): if type(value[1]) is list: nested = _gen_schema._make_schema(dict(value[1])) fields[index] = [value[0], Nested(nested, required=not value[2], many=True)] return _gen_schema._make_schema(fields) def _make_schema(fields): if type(fields) is list: fields = dict(fields) ts = str(time.time()).replace('.', '') return type('schema_%s' % ts, (Schema,), fields) def _handle_nested(spec): fields = [] for group in re.findall('([\w,]+):([\w\[\]]+)(\??)', spec): for field in group[0].split(','): ftype = None if group[1].endswith('[]'): value = _gen_schema._handle_list(group[1], not len(group[2])) fields.append([field, value]) else: value = _gen_schema._type_hash[group[1]](required=not len(group[2])) fields.append([field, value]) return fields def _handle_list(ltype, required): return List(_gen_schema._type_hash[ltype.split('[]')[0]], required=required) _gen_schema._type_hash = { 'str': Str, 'bool': Bool, 'int': Int, 'dt': DateTime, 'url': URL } _gen_schema._make_schema = _make_schema _gen_schema._handle_nested = _handle_nested _gen_schema._handle_list = _handle_list

Implementing such a parser took me one and a half pomodoros, but thankfully now making any tweaks or additions to the schemas becomes a trivial task. Finally we get to tests.py, which will import schemas() and use it to validate schemas of all responses saved in Redis:

import pytest import redis from validation import schemas from concurrent.futures import ThreadPoolExecutor from os.path import abspath, dirname def test_schemas(): validate = schemas() def _validate_schema(uri): response = REDIS.hget('responses', uri) return validate(uri, response), uri with ThreadPoolExecutor(max_workers=64) as executor: for result in executor.map(_validate_schema, REDIS.hgetall('responses')): if result[0] is not None: assert not len(result[0].keys()), result[1] BASE = abspath(dirname(__file__)) REDIS = redis.Redis()

Deployment and Scaling with Kubernetes

I first experimented with Kubernetes in 2015 while migrating an application from AWS to GCP. Back then it was still in its early stages and I ran into problems so mysterious to debug that I opted for regular Compute Engine instances.

But time passed and Kubernetes evolved. Interest in the platform keeps trending up as it nears its 1.3 release.

Johan Haleby has a compelling case in Kubernetes' support, listing shortcomings with other container orchestration offerings: AWS ECS (security group hassle, lack of service discovery and port management), Tutum (unable to reschedule containers from a failed node), Docker Swarm (not a managed service, insufficient API for a cluster), Mesosphere DCOS (not a managed service, multi-cloud capabilitites only available in paid version).

Despite initially considering Docker Swarm for its alignment to the Docker API, I decided to revisit Kubernetes and stuck with it.

As far as development and operations go, I believe Kubernetes is going to be a bigger standard than Docker. There already is support for Rocket containers and there are very good reasons to try it.

Kubernetes has a rather elaborate jargon, but in short, it lets you specify groups of containers (pods), replicated groups of containers (replication controllers) and load balancers (services), much like you can write a Dockerfile specifying a single container. With replication controllers, Kubernetes allows you to scale resources without downtime when needed, while also providing reliable failure resilience by automatically watching over running pods and ensuring they match the desired count, i.e., new pods are automatically spawn if any of them go down for any reason. Replication controllers are generally better than pods because they ensure capacity. Even when using a single node, it guarantees resilience if it fails for any reason.

I used three replication controllers, one for Redis and the others for the Locust master node and Locust slave nodes respectively, and two services to allow Locust to connect to Redis, and Locust slaves to connect to the Locust master.

Kubernetes requires you to upload your container images to Docker Hub or in the case of Container Engine, its own private container registry.

It's recommended to have behaviour set through environment variables (see The Twelve-Factor App for best practices in deployment) and data made available via volumes, also carefully specified in the Kubernetes API.

The master replication controller is defined as follows:

kind: ReplicationController apiVersion: v1 metadata: name: testrunner-master labels: name: testrunner role: master spec: replicas: 1 selector: name: testrunner role: master template: metadata: labels: name: testrunner role: master spec: containers: - name: testrunner image: gcr.io/<id>/testrunner:latest env: - name: TESTRUNNER_ROLE key: TESTRUNNER_ROLE value: master ports: - name: tr-port-8089 containerPort: 8089 protocol: TCP - name: tr-port-5557 containerPort: 5557 protocol: TCP - name: tr-port-5558 containerPort: 5558 protocol: TCP

Below is the slave replication controller:

kind: ReplicationController apiVersion: v1 metadata: name: testrunner-slaves labels: name: testrunner role: slave spec: replicas: 10 selector: name: testrunner role: slave template: metadata: labels: name: testrunner role: slave spec: containers: - name: testrunner image: gcr.io/<id>/testrunner:latest env: - name: TESTRUNNER_ROLE key: TESTRUNNER_ROLE value: slave - name: TESTRUNNER_MASTER key: TESTRUNNER_MASTER value: testrunner-master

And finally, the master service definition:

kind: Service apiVersion: v1 metadata: name: testrunner-master labels: name: testrunner role: master spec: ports: - port: 8089 targetPort: tr-port-8089 protocol: TCP name: tr-port-8089 - port: 5557 targetPort: tr-port-5557 protocol: TCP name: tr-port-5557 - port: 5558 targetPort: tr-port-5558 protocol: TCP name: tr-port-5558 selector: name: locust role: master type: LoadBalancer

Notice how in the slave replication controller, the TESTRUNNER_MASTER environment variable which is passed to the container is set to testrunner-master, a hostname made available by the service definition above. Also, all service targetPort values match the ones defined in the targeted replication controller. Redis is made available to the Python processes the very same way, through a service and replication controller in Kubernetes.

On the testrunner's Dockerfile, ENTRYPOINT is set to run scripts/main.sh which picks up the environment variables and starts the locust process in the appropriate mode.

#!/bin/bash if [[ "$TESTRUNNER_ROLE" = "master" ]]; then locust --master elif [[ "$TESTRUNNER_ROLE" = "slave" ]]; then locust --slave --master-host=$TESTRUNNER_MASTER fi

Once you end a Locust run and have your API responses saved, you can SSH to the master node and run the py.test suite from the command-line, or, run it locally or elsewhere by simply enabling remote access to the Redis service.

This article was mostly illustrative, but you can find supporting source code and more implementation details in GCP's sample Locust project. It doesn't include any of the log harvesting and schema validation code from this article, but it can get you a cluster running in no time.