I'm Jonas Galvez, a JavaScript and Go engineer at STORED e-commerce. I started my carreer as an ActionScript developer 18 years ago. Since then, I've had a 10-year long affair with Python and have now returned to ECMAScript land.

I'm an eternal student of distributed systems and practitioner of minimalism.

I build and deploy my stuff with Vue, Nuxt, Go and Kubernetes nowadays.

Check out my dedicated page to my professional influences.

I am driven by purpose and cultivate a stoic attitude towards life.

A Million Schema Validations on Kubernetes

I was tasked with the job of running a schema validation test on roughly a million API requests. The API I was testing was already live and actively used by thousands of customers which employed nearly all of its capacities. I had access to 20+ log files ranging between 200MB to 300MB each, compressed.

Harvesting logs to infer common signatures

My first attempts at parsing them made it clear they needed some serious sanitization. After experimenting with a variety of URL sanitization approaches, I set out to find a concise way to describe the endpoints I needed to test, and use such descriptions to parse valid API call signatures from the logs. Using an hypothetical API to list foods as an example, this is what it came to look like:

fruits
  /api/fruits
  apikey! sort items page
  /api/fruits/<search>
  apikey! sort items page

vegetables
  /api/vegetables
  apikey! sort items page
  /api/vegetables/<search>
  apikey! sort items page

This is of course only illustrative as the actual API had dozens endpoints and potential parameter combinations that needed to be tested. I suggestively named this file ENDPOINTS and placed it at the root of my test suite. I used two-space indentation for structuring each endpoint description. The first line specifies the filename where sanitized URLs will be collected. The second line specifies the URI and URI parameters and the third line, query string parameters. If a parameter is required, it's suffixed with an exclamation mark. With this simple specification file, I then used werkzeug's routing module to parse 1: http://werkzeug.pocoo.org/docs/0.11/routing/

import re
from werkzeug.routing import Map, Rule

def endpoints(host):
    map = Map()
    current_endpoint = None
    with open('ENDPOINTS') as endpoints:
        for line in endpoints:
            line = line.rstrip()
            if re.search('^[^\s]+', line):
                endpoint_group = line.strip()
                index = 1
            elif re.search('^\s+/', line):
                current_endpoint = Rule(
                    line.strip(), 
                    endpoint = '%s-%s' % (endpoint_group, index)
                )
                index += 1
            elif re.search('^\s+', line):
                current_endpoint.params = re.split('\s+', line)
                map.add(current_endpoint)
    return map.bind(host)

Parsing of URI parameters is already handled by werkzeug, and since they're part of the endpoint itself they're implicitly required. I store query string parameters for verification later as params in each Rule object (a custom addition). Moving on to the actual log parser, my main goals were: a) discarding malformed calls and removing junk from otherwise valid requests (sanitization) and b) avoiding repetitive API calls in the test suite.

The parsing relies heavily on MapAdapter, which raises HTTP error exceptions and returns parsed URI parameters. Using the previously stored params list from each Rule object, I then ensure required parameters are present and remove all invaliad parameters (missing from each definition in ENDPOINTS) at the end. Also worth noting tqdm provides a progress bar while reading the files.

with open(logfile, 'rb') as logfile_handler:
    size = os.path.getsize(logfile)
    with tqdm(total=size) as pbar:
        newpos = None
        oldpos = logfile_handler.tell()
        for line in gzip.GzipFile(fileobj=logfile_handler):
            newpos = logfile_handler.tell() 
            if newpos != oldpos:
                pbar.update(newpos if oldpos == 0 else newpos-oldpos)
                oldpos = newpos
            request_data = re.findall('"([A-Z]+)\s+(.*?)\s+HTTP/1.1', line)
            if len(request_data):
                uri = request_data[0][1]
                try:
                    parsed_uri = urlparse.urlparse(uri)
                    rule, args = endpoints.match(parsed_uri.path, return_rule=True)
                except NotFound, RequestRedirect:
                    continue
                if random() > 0.0009:
                    continue
                parsed_query = cgi.parse_qs(parsed_uri.query)
                if getattr(rule, 'params', False):
                    required_params = [
                        param[:-1] 
                        for param in rule.params 
                        if param.endswith('!')
                    ]
                    optional_params = [
                      param 
                      for param in rule.params 
                      if not param.endswith('!')
                    ]
                else:
                    required_params, optional_params = [], []
                all_parameters = set(required_params+optional_params)
                for param in set(parsed_query.keys()).difference(all_parameters):
                    del parsed_query[param]

Since each file had millions of lines, in order to generate a manageably sized data set I gradually lowered the exclusion rate until it got to about 200 results from each file. But there were still a lot of duplicates. I figured I didn't need to test the same API call signature (combination of parameters used) more than a few times. So I used Python's native hash function to generate a unique signature built from the Rule definition and a hash of all parameters passed to each call. Picking up from the previous snippet:

                keyset = frozenset(parsed_query.keys())
                signature = '%s%s' % (rule.rule, hash(keyset))
                if signatures.get(signature):
                    if len(keyset) == 0 and signatures[signature] > 0:
                        continue
                    if signatures[signature] > 5:
                        continue
                    signatures[signature] += 1
                else:
                    signatures[signature] = 1

If the parser sees an API call and the number of times it's been collected doesn't exceed the limit, it's then reassembled with the sanitized parameters and written to disk under a directory named after the log file, and the filename spcified in the ENDPOINTS file.

                cleaned_query = {
                    key: parsed_query[key][0] 
                    for key in parsed_query.keys()
                }
                sample_filename = os.path.join(samplesdir, '%s.sample' % rule.endpoint)
                cleaned_uri = '%s?%s' % (
                    parsed_uri.path, 
                    urllib.urlencode(cleaned_query)
                )
                with open(sample_filename, 'a+') as sample:
                    sample.write('%s\n' % cleaned_uri)

To run multiple instances of it in parallel I just used screen. Starting too many instances at once seemed to cause a few processes to crash (this was a virtualised development server), so I divided it in batches (in ${A[@]:x:y}, x is the starting index and y is the number of items to slice):

logfiles=(~/logs/*.gz)
for file in ${logfiles[@]:0:5}; do
    screen -dmS $(basename $file) fab harvest:$file
done

Running screen with -dMS sets a custom title for each session (in this case, the log filename) and detaches it from the terminal before running. To monitor I would attach any of the running sessions (e.g., screen -r 24124), which would show me the progress bar and an exit message if done:

100%|█████████████████████████| 309156334/309156334 [48:12<00:00, 106864.79it/s]
Press any key to exit.

The next and final step was to map and reduce the results. First, concatenate results from all directories into mapped/, then sort -u all of them into reduced/. Doesn't make this step of the process scalable, but definitely good enough for harvesting a couple dozen log files.

mkdir -p mapped reduced
cat */fruits.sample > mapped/fruits.sample
cat */vegetables.sample > mapped/vegetables.sample
sort -u mapped/fruits.sample > reduced/fruits.sample
sort -u mapped/vegetables.sample > reduced/vegetables.sample

Saving responses from Load Test

Mark Nottingham has a seminal starting point on the subject, bringing attention to the necessity of verifying the test server's bandwidth and making sure you don't get too close to the limit, among many other sensible recommendations. Mark's post led me to autobench and many others tools, such as siege, loads, funkload, blitz (SaaS), gatling and tsung. None of them came close to the elegance and simplicity of Locust though, which aside from being written in modern Python, has a clear, concise API and built-in support for running a distributed cluster.

To use it you need to write a locustfile, which is just a Python file with at least one Locust subclass. Each Locust object specifies a list of tasks to be performed during the test, represented by a TaskSet class.

Although you can use TaskSet objects to build a test suite, I believe it falls short in comparison to the myriad of benefits of pytest. I chose to let Locust only do the load test and save responses from the API, for further consumption by a pytest suite subsequently. The file system would have been fine but since I intended to run Locust distributedly, I used Redis as storage for the responses. TaskSet methods are dynamically generated from the reduced list of URIs.

import os
import redis
import glob
import functools
from locust import HttpLocust, TaskSet

BASE = os.path.abspath(os.path.dirname(__file__))

def _save_response(uri, tset):
    response = tset.client.get(uri)
    tset.redis.hset('responses', uri, response.content)

class ResponseSaver(TaskSet):
    def __init__(self, parent):
        super(ResponseSaver, self).__init__(parent)
        self.tasks = list(self.gen_tasks())
        self.redis = redis.Redis()
    def gen_tasks(self):
        samples = glob.glob('%s/samples/reduced/*.sample' % BASE)
        for sample in samples:
            with open(sample) as handler:
                for line in handler:
                    yield functools.partial(_save_response, line.strip())

class APIUser(HttpLocust):
  task_set = ResponseSaver
  host = "http://api.host"

Locust is currently based on gevent, but I wouldn't be surprised if they upgrade it to aiohttp which makes use of PEP 3156 and PEP 492.

Running test suite against responses

Now that we got past load testing and got our pretty graphs from Locust, we can proceed to writing some actual unit tests. The primary goal was to test data integrity, i.e., validate schemas of all JSON responses. That's where marshmallow comes in. It's a ORM/ODM/framework-agnostic library that allows you to cleanly define a schema and validate a JSON document against it. It's better than manually validating each response, but I still found it too verbose to maintain. I wanted something simple and concise as the ENDPOINTS file. So I came up with another small DSL, a thin layer on top of marshmallow:

success:bool data:{name:str link:url vitamins,minerals:str[]}
 /api/fruits
 /api/fruits/<search>
 /api/vegetables
 /api/vegetables/<search>

I called this SCHEMAS. By now it's clear I have a knack for DSLs. The syntax is fairly straightforward: field:type, where field can be a comma-separated list, and type can be followed by [] to indicate an array and ? to indicate it's optional (since optional parameters were more common in ENDPOINTS, it used ! instead, to indicate which fields were required). To specify a nested schema within each response (for instance, an array of objects under data), you can enclose its fields with {} where a single type would normally be defined.

Like in ENDPOINTS, single space indentation for listing endpoints to validate schemas against. To match URIs, again I used werkzeug.routing.

Following next is validation.py, where schemas() is responsible for parsing the SCHEMAS file and returning a validation function that will know what schema to use according to the URL it gets. The validation function takes uri and response as parameters, both strings.

def schemas(host='api.host'):
    map = Map()
    with open('%s/SCHEMAS' % BASE) as schemas:
        for line in schemas:
            line = line.rstrip()
            if re.search('^[^\s]+', line):
                schema = _gen_schema(line)
            elif re.search('^\s+', line):
                endpoint = re.sub('\W+', '-', line)
                rule = Rule(line.strip(), endpoint=endpoint)
                rule.schema = schema
                map.add(rule)
    _validate.matcher = map.bind(host)
    return _validate

def _validate(uri, response):
    try:
        rule, args = _validate.matcher.match(uri, return_rule=True)
        data, errors = rule.schema().loads(response)
        return errors
    except NotFound, RequestRedirect:
        return None

def _gen_schema(spec):
    fields = []
    for nested in re.findall('([\w,]+):{(.+)}(\??)', spec.strip()):
        for field in nested[0].split(','):
            fields.append([field, _gen_schema._handle_nested(nested[1]), not len(nested[2])])
        {% raw %}spec = spec.replace('%s:{%s}' % nested[:2], ''){% endraw %}
        spec = spec.replace('  ', ' ')
    fields += _gen_schema._handle_nested(spec)
    for index, value in enumerate(fields):
        if type(value[1]) is list:
            nested = _gen_schema._make_schema(dict(value[1]))
            fields[index] = [value[0], Nested(nested, required=not value[2], many=True)]
    return _gen_schema._make_schema(fields)

def _make_schema(fields):
    if type(fields) is list:
        fields = dict(fields)
    ts = str(time.time()).replace('.', '')
    return type('schema_%s' % ts, (Schema,), fields)

def _handle_nested(spec):
    fields = []
    for group in re.findall('([\w,]+):([\w\[\]]+)(\??)', spec):
        for field in group[0].split(','):
            ftype = None
            if group[1].endswith('[]'):
                value = _gen_schema._handle_list(group[1], not len(group[2]))
                fields.append([field, value])
            else:
                value = _gen_schema._type_hash[group[1]](required=not len(group[2]))
                fields.append([field, value])
    return fields

def _handle_list(ltype, required):
    return List(_gen_schema._type_hash[ltype.split('[]')[0]], required=required)

_gen_schema._type_hash = {
    'str': Str, 
    'bool': Bool, 
    'int': Int, 
    'dt': DateTime,
    'url': URL
}

_gen_schema._make_schema = _make_schema
_gen_schema._handle_nested = _handle_nested
_gen_schema._handle_list = _handle_list

Finally we get to tests.py, which will import schemas() and use it to validate schemas of all responses saved in Redis:

import pytest 
import redis
from validation import schemas
from concurrent.futures import ThreadPoolExecutor
from os.path import abspath, dirname

def test_schemas():
  validate = schemas()
  def _validate_schema(uri):
    response = REDIS.hget('responses', uri)
    return validate(uri, response), uri
  with ThreadPoolExecutor(max_workers=64) as executor:
    for result in executor.map(_validate_schema, REDIS.hgetall('responses')):
      if result[0] is not None:
        assert not len(result[0].keys()), result[1]

BASE = abspath(dirname(__file__))
REDIS = redis.Redis()

Deployment and Scaling with Kubernetes

I first experimented with Kubernetes in 2015 while migrating an application from AWS to GCP. Back then it was still in its early stages and I ran into problems so mysterious to debug that I opted for regular Compute Engine instances.

But time passed and Kubernetes evolved. Interest in the platform keeps trending up as it nears its 1.3 release.

Johan Haleby has a compelling case in Kubernetes' support, listing shortcomings with other container orchestration offerings: AWS ECS (security group hassle, lack of service discovery and port management), Tutum (unable to reschedule containers from a failed node), Docker Swarm (not a managed service, insufficient API for a cluster), Mesosphere DCOS (not a managed service, multi-cloud capabilitites only available in the paid version).

Despite initially considering Docker Swarm for its alignment to the Docker API, I decided to revisit Kubernetes and stuck with it.

As far as development and operations go, I believe Kubernetes is going to be a bigger standard than Docker. There already is support for Rocket containers and there are very good reasons to try it.

Kubernetes has a rather elaborate jargon, but in short, it lets you specify groups of containers (pods), replicated groups of containers (replication controllers) and load balancers (services), much like you can write a Dockerfile specifying a single container. With replication controllers, Kubernetes allows you to scale resources without downtime when needed, while also providing reliable failure resilience by automatically watching over running pods and ensuring they match the desired count, i.e., new pods are automatically spawn if any of them go down for any reason. Replication controllers are generally better than pods because they ensure capacity. Even when using a single node, it guarantees resilience if it fails for any reason.

I used three replication controllers, one for Redis and the others for the Locust master node and Locust slave nodes respectively, and two services to allow Locust to connect to Redis, and Locust slaves to connect to the master.

Kubernetes requires you to upload your container images to Docker Hub or in the case of Container Engine, its own private container registry.

It's recommended to have behaviour set through environment variables (see The Twelve-Factor App for best practices in deployment) and data made available via volumes, also carefully specified in the Kubernetes API.

The master replication controller is defined as follows:

kind: ReplicationController
apiVersion: v1
metadata:
  name: testrunner-master
  labels:
  name: testrunner
  role: master
spec:
  replicas: 1
  selector:
  name: testrunner
  role: master
  template:
  metadata:
    labels:
    name: testrunner
    role: master
  spec:
    containers:
    - name: testrunner
      image: gcr.io/&lt;id&gt;/testrunner:latest
      env:
      - name: TESTRUNNER_ROLE
        key: TESTRUNNER_ROLE
        value: master
      ports:
      - name: tr-port-8089
        containerPort: 8089
        protocol: TCP
      - name: tr-port-5557
        containerPort: 5557
        protocol: TCP
      - name: tr-port-5558
        containerPort: 5558
        protocol: TCP

Below is the slave replication controller:

kind: ReplicationController
apiVersion: v1
metadata:
  name: testrunner-slaves
  labels:
  name: testrunner
  role: slave
spec:
  <span class="bolder">replicas: 10</span>
  selector:
  name: testrunner
  role: slave
  template:
  metadata:
    labels:
    name: testrunner
    role: slave
  spec:
    containers:
    - name: testrunner
      image: gcr.io/&lt;id&gt;/testrunner:latest
      env:
      - name: TESTRUNNER_ROLE
        key: TESTRUNNER_ROLE
        value: slave
      - name: TESTRUNNER_MASTER
        key: TESTRUNNER_MASTER
        value: testrunner-master

And finally, the master service definition:

kind: Service
apiVersion: v1
metadata:
  name: testrunner-master</span>
  labels:
  name: testrunner
  role: master
spec:
  ports:
  - port: 8089
    targetPort: tr-port-8089</span>
    protocol: TCP
    name: tr-port-8089
  - port: 5557
    targetPort: tr-port-5557</span>
    protocol: TCP
    name: tr-port-5557
  - port: 5558
    targetPort: tr-port-5558</span>
    protocol: TCP
    name: tr-port-5558
  selector:
  name: locust
  role: master
  type: LoadBalancer

Notice how in the slave replication controller, the TESTRUNNER_MASTER environment variable which is passed to the container is set to testrunner-master, a hostname made available by the service definition above. Also, all service targetPort values match the ones defined in the targeted replication controller. Redis is made available to the Python processes the very same way, through a service and replication controller in Kubernetes.

On the testrunner's Dockerfile, ENTRYPOINT is set to run scripts/main.sh which picks up the environment variables and starts the locust process in the appropriate mode.

#!/bin/bash
if [[ "$TESTRUNNER_ROLE" = "master" ]]; then
  locust --master
elif [[ "$TESTRUNNER_ROLE" = "slave" ]]; then
  locust --slave --master-host=$TESTRUNNER_MASTER
fi

Once you end a Locust run and have your API responses saved, you can SSH to the master node and run the py.test suite from the command-line, or, run it locally or elsewhere by simply enabling remote access to the Redis service.

This article was mostly illustrative, but you can find supporting source code and more implementation details in GCP's sample Locust project. It doesn't include any of the log harvesting and schema validation code from this article, but it can get you a cluster running in no time.