Metis Machine's Skafos

Machine Learning Delivered. A Machine Learning deployment platform built to unite Data Scientist, DevOps and Engineering.

Welcome to the Metis Machine documentation hub. You'll find comprehensive guides and documentation to help you start working with Metis Machine's Skafos platform as quickly as possible, as well as support if you get stuck. Fire it up!

Get Started    

Skafos Queue

Many problems in machine learning require large amounts of data and processing that, if handled sequentially, will take a long time to complete. Taking advantage of a queue can accelerate this process, enabling simultaneous processing of Jobs in your Project.

The Skafos Queue gives you the ability to do three things:

  • Ensure fault-tolerant processing of valuable production data.
  • Quickly move data between Jobs within a Project.
  • Parallelize consuming Jobs to speed up your pipeline.

Although there are a number of open source queueing libraries available for use today, configuring and setting them up can be tedious and time consuming. The Skafos Queue handles this infrastructure, so accessing and using the queue is a matter of a few function calls from the SDK. All that is required is the inclusion of the queue AddOn in your configuration file.

What is a Queue?

A queue is a service that stores bits of data as “messages” in a list that preserves the order of entrance. A typical queuing system has a few important components:

  • Publisher - a job that sends messages to a queue, or multiple queues, for processing further down the pipeline.

  • Queue - a buffer that holds messages as they come in, maintaining order.

  • Consumer - a single, or multiple jobs that subscribe to a queue, listen for new messages as they come in, processing them in order.

A queue can be leveraged in many ways to store, process, and parallelize computation within the context of a series of Jobs.

Workflow Examples

Below are two high-level examples demonstrating how a queue works and how someone might use them within their projects.

The "Publish" arrow refers to placing messages onto the Skafos Queue. The "Consume" arrow refers to removing a message from the Skafos Queue.

The "Publish" arrow refers to placing messages onto the Skafos Queue. The "Consume" arrow refers to removing a message from the Skafos Queue.

Example #1: Single Queue & Parallel Consumers

As a part of your deployed ML pipeline, you need to perform some work (a series of transformations), on data coming from a rather large source. A single publishing Job places a message, containing data relevant to the task at hand, on the Skafos Queue. In this example, the message could contain all sorts of things. It depends on the problem - here are some examples:

  • the name of a table to fetch data from to process
  • the name of a single column to process
  • the location of a pre-trained model to score some records with
  • a piece of meta-data that guides an ingestor to retrieve and process data from external API
  • a timestamp of when the message entered the queue

In simplest form, Job #1 creates a new queue, and one-by-one publishes messages. Once on the queue, messages maintain their order. Job #2 contains a series of transformations that are be executed once it receives a new message. The beauty here is that if Job #2 is consuming from a Skafos Queue, it can be parallelized (replicated) within the system, enabling much faster processing speeds. After Job #2 has finished it's work, it acknowledges the message, which safely removes it from the queue.

Another benefit of parallelization: If one of Job #2 crashes prior to acknowledging the message, the message will stay on the queue — in order — and can then be consumed by another replica of Job #2.

Skafos Queue Example #1.

Skafos Queue Example #1.

Example #2: Multiple Queues

This example is very similar to the first, however, now you need to break out your work between different consumers that perform unique tasks. As a part of your deployed ML pipeline, you need to perform some work (a series of transformations) that is specific to a particular group or type of data. A single publishing Job sends messages to different queues. The algorithm in the consuming job is not identical, requiring it's own queue. For example, each queue might hold a set of HTTP requests to be made to a different REST API, where resulting data is cleaned and stored in a specialized way.

In simplest form, Job #1 creates 3 new queues; one for each specific group of messages. One-by-one, Job #1 publishes messages. Once on their respective queues, messages maintain their order. Job #2-4 contain algorithms that are executed as they consume messages. Once finished processing a single record, each consumer acknowledges the message, safely removing it from it's queue.

Skafos Queue Example #2.

Skafos Queue Example #2.

Usage

Skafos Queues are accessible from your jobs using the Skafos SDK. Before you can start using this tool, you need to define the queue AddOn in your project's metis.config.yml file.

project_token: <project_token>
name: my_new_project
jobs: 
  - job_id: <job_id>
    language: python
    name: Main
    entrypoint: "main.py"
add-ons:
  - name: queue

The Skafos Queue object within the SDK exposes 5 methods for you to create and utilize queues within your workflow.

Accessing the Queue through the Skafos SDK

The SDK class skafossdk.Skafos.queue allows you to build new queues with unique routing keys to:

  • properly route data
  • publish messages
  • consume messages
  • acknowledge messages after processing
  • close down connections to the queue

All operations are threadsafe and performed on a blocking connection to the queue instance.

Methods

The following are the methods exposed by the queue class:

Method
Description
Parameters
Returns

setup

Build a single, or multiple queue instances by providing new names and routing keys to bind on an exchange. Once created, they are made available for publishing and consuming - in and between jobs.

  • queue_names : list or str. Names of the individual queues to create. Can be a single queue or a list.
  • routing_keys : list or str. Keys to use for each queue created above. Can be a single key or a list.
  • delete : boolean.
    Whether or not to blow away queues with the same name prior to creating. Defaults to False.
  • exchange_name: str. Name of the exchange to publish to. Defaults to "default".
  • status : boolean.
    Whether or not the setup operation was successful.

publish

Send a single message to an existing queue by routing_key in blocking fasion.

  • routing_key : str.
    Key used to route messages to the proper queue. Must already be setup.
  • body : str.
    Message to publish to the queue.
  • exchange_name : str. Name of the exchange to publish to. Defaults to None and inherits the name used in the setup method.
  • status : boolean.
    Whether or not the publish operation was successful.

consume

Listen for messages, one-by-one, from a queue by queue_name in blocking fasion.

  • queue_name : str.
    Name of the individual queue to consume from. Must already be setup. Defaults to None.
  • wait_timeout : int or float.
    Number of seconds to wait for items to come off the queue. Will throw an Empty error if timeout is exceeded. Defaults to None.
  • auto_ack : boolean.
    Whether or not to automatically acknowledge messages from the queue while consuming. Defaults to False.
  • method : str
  • properties : str
  • body : str.
    Message retrieved from the queue.

ack

When sent by the client, this method acknowledges one message from the queue (safe removal).

  • delivery_tag: int.
    Server assigned delivery tag. Use method.delivery_tag.

None

stop

Close all publishing and consuming connections to the active queues.

None

None

Code Examples

Setting up a new Skafos Queue

from skafossdk import *

ska = Skafos()

# Build a new queue
exchange = "test_exchange"
name = "my_first_queue"
key = "queue_key"

## Should print True if operation was successful
res = ska.queue.setup(exchange_name=exchange,
                      queue_names=name,
                      routing_keys=key,
                      delete=True)
ska.log(f"Skafos Queue Setup: {res}")

## Close queue connection
ska.queue.stop()

Publishing messages to a Skafos Queue

from skafossdk import *

ska = Skafos()

# Prepare to send messages
exchange = "test_exchange"
key = "queue_key"
messages = ["hello", "howdy", "hi"]

## Send 3 messages
## Should print True if operation was successful
for msg in messages:
    res = ska.queue.publish(exchange_name=exchange,
                            routing_key=key,
                            body=msg)
    ska.log(f"Publish Success: {res}")

## Close queue connection
ska.queue.stop()

Consuming & Ack'ing messages from a Skafos Queue

from skafossdk import *

ska = Skafos()
name = "my_first_queue"

## Consume messages in a busy loop
## Catch excpetion for when Skafos Queue is empty
try:
    while True:
      method, properties, body = ska.queue.consume(queue_name=name, wait_timeout=60)
      ska.log(f"Consume Body: {body}")
      ### Ack the message from the Skafos Queue
      ska.queue.ack(method.delivery_tag)
except Exception as e:
    ska.log(f'Error {e}')
    
## Close queue connection
ska.queue.stop()