Chapter 39. Batch Execution

Note

Batch execution as described in this chapter is no longer actively supported. It has been superseded by lb-workflow.

The LogicBlox distribution contains a purely client-side batch specification and execution tool. A batch job is specified using the batch protocol, which is specified in $LOGICBLOX_HOME/lb-web/lib/protobuf/batch.proto. A batch specification message can then be executed using lb web-client batch.

The basic element of a batch specification is a statement. A statement can be simple, or composite. A simple statement represents a single, indivisible operation. A composite statement composes one or more statements together, defining control flow or specific semantics for their execution. The following tables provide an overview of simple and composite statements with references to the detailed explanations that can be found in Section 39.3, “Simple Statements” and Section 39.4, “Composite Statements”.

Simple Statements
ImportDelim

Import a delimited file to the tabular data exchange service.

ExportDelim

Export a delimited file from the tabular data exchange service.

S3Upload

Upload a file to an S3 file system.

S3Download

Download a file from an S3 file system.

CallProto

Call a custom ServiceBlox service.

ClearQueue

Clear SQS queues.

ListQueue

List SQS queues.

Exec

Execute command line statements at the local machine.

Echo

Write strings to standard output.

Fail

Return ERROR status.

Noop

Return SUCCESS status.

Composite Statements
StmSeq

Execute a sequence of statements in order, synchronously.

StmPar

Execute a set of statements asynchronously, in parallel.

StmTryCatch

Emulate a try/catch control-flow.

StmTxn

Execute a set of statements asynchronously in a single transaction, aborting the transaction on any ERROR.

39.1. Executing a Batch Specification

A batch specification must follow the batch protocol. At its root, a specification Spec contains an optional BatchConfig message that can be used to configure various aspects of the execution (see Section 39.2, “Configuring Batch Execution”).

message Spec
{
  optional BatchConfig config = 1;
  required Stm stm = 2;
}

The Spec message also contains a required root Stm message. The Stm message specifies the statements to be executed. Every simple statement or composite statement is wrapped within a Stm message, which is defined by the following protocol:

message Stm
{
  optional StmSeq seq = 1;
  optional StmTxn transaction = 2;
  optional StmPar parallel = 3;
  optional StmSimple simple = 4;
  optional StmTryCatch try_catch = 5;

  // Result (exception and end_time are only set if status is set)
  optional Status status = 6;
  optional string exception = 7;
  optional string end_time = 8;

  // Optional field to describe the purpose of a Stm
  optional string description = 9;

  // Result
  optional string start_time = 10;
}

Batch specifications can be manually written, but it is often more convenient to use a generator. The following example shows how to generate a Spec message using the ServiceBlox batch Python module.

from google.protobuf import text_format
import lb.web.batch_pb2

# create a batch specification
batch_spec = lb.web.batch_pb2.Spec()

# create the root statement as a composite SeqStm
root_stm = batch_spec.stm.seq

# add a simple echo statement to the sequence
echo1_stm = root_stm.stm.add()
echo1_stm.simple.echo.text = 'hello world!'

# add another echo statement right next
echo2_stm = root_stm.stm.add()
echo2_stm.simple.echo.text = 'after hello world!'

# store the specification in a file
spec_str = text_format.MessageToString(batch_spec)
with open('my-batch.batch', 'w') as f:
    f.write(spec_str)

When executed, this Python script will generate a my-batch.batch file with the batch specification in textual format. The following listing shows the contents of the batch.

stm {
  seq {
    stm {
      simple {
        echo {
          text: "hello world!"
        }
      }
    }
    stm {
      simple {
        echo {
          text: "after hello world!"
        }
      }
    }
  }
} 

The batch can then be executed with the web-client command as follows:

$ lb web-client batch -i my-batch.batch
hello world!
after hello world!
stm {
  seq {
    stm {
      simple {
        echo {
          text: "hello world!"
        }
      }
      status: SUCCESS
      end_time: "2013-11-11T12:02:32.204"
      start_time: "2013-11-11T12:02:32.192"
    }
    stm {
      simple {
        echo {
          text: "after hello world!"
        }
      }
      status: SUCCESS
      end_time: "2013-11-11T12:02:32.211"
      start_time: "2013-11-11T12:02:32.211"
    }
  }
  status: SUCCESS
  end_time: "2013-11-11T12:02:32.212"
  start_time: "2013-11-11T12:02:32.192"
}

The first two lines display the echo commands that were executed in the sequence. After execution, the batch command will also return the original Spec message annotated with the result of the execution. Each child Stm of a composite statement will also be annotated with the result of its execution.

A simple statement will have the status of SUCCESS, ERROR, or DNS (if it has not been executed at all, e.g., because of errors in statements that precede it). The status of a composite statement will be one of the following:

DNS

The statement was not executed.

SUCCESS

All child statements return SUCCESS.

ERROR

All child statements return ERROR or DNS.

PARTIAL_ERROR

Some child statements return SUCCESS and others return ERROR (while yet others may return DNS).

On failure, the Stm message will also return an exception field initialized with the exception that caused the failure.

Every executed statement has a start_time as well as an end_time, which allows the calculation of the statement execution time.

39.2. Configuring Batch Execution

Certain aspects of batch execution can be configured from within the batch specification itself. Global options and defaults can be specified in the BatchConfig message contained in Spec. Furthermore, some simple statements, such as ImportDelim and CallProto, can contain a TransportConfig message that configures the underlying transport mechanism for the execution of the statement.

The following listing shows the BatchConfig protocol. The first three options configure the execution of statements that communicate directly with AWS S3, such as S3Download and S3Upload, and also indirectly, such as ImportDelim where the input is an S3 URL. The next two options configure concurrency aspects of the batch execution. Finally, the last two options configure the execution of statements that use TCP as the underlying protocol. Note that options described as default can be overridden by individual statements.

message BatchConfig
{
  // Directory containing S3 encryption keys.
  // Mandatory for statements that use S3.
  optional string keydir = 1;
  // Default name of the encryption key to use with S3.
  optional string keyname = 2;
  // Size of chunks of data when using S3.
  // Defaults to s3_chunk_size in lb-web-client.config.
  optional int64 chunksize = 3;

  // Max number of concurrent statements to execute.
  // Defaults to batch_max_concurrent in lb-web-client.config.
  optional int64 max_concurrent = 4;
  // Max number of concurrent S3 uploads/downloads to execute.
  // Defaults to s3_max_concurrent in lb-web-client.config.
  optional int64 max_concurrent_s3 = 5;

  // Max number of threads to use for TCP transports.
  // This limits the number of TCP connections that can be open concurrently.
  optional int64 tcp_client_threads = 6;
  // Default max number of connections to a certain host.
  // This limits the number of TCP connections targetting a certain host.
  optional int64 tcp_max_connections_per_address = 7 [default = 100];
}

Some individual statements can also be configured with the following TransportConfig message. Only one of the optional fields should be used. They define the transport to be used in the communication with the server. The config field can be used to reference a section of the default lb-web-client.config file or of a custom configuration file passed as a command line argument. The additional options define whether the HTTP messages should be sent via a TCP connection, an Amazon SQS queue or a RabbitMQ queue.

message TransportConfig
{
  optional string config = 1;
  optional SQSTransportConfig sqs = 2;
  optional TCPTransportConfig tcp = 3;
}

The following protocol defines the options available for TCP transport. Of interest is the max_connections_per_address option, which defaults to the value specified in the BatchConfig message, but which can be refined at the level of individual statements. Please refer to the protobuf specification for details about SSLConfig, SQSTransportConfig, and RabbitTransportConfig.

message TCPTransportConfig
{
  optional SSLConfig ssl = 1;

  // Max time the whole exchange can take (in ms).
  optional int64 timeout = 2 [default = 600000];

  // Max time to wait when establishing a connection to the server (in ms).
  optional int64 connect_timeout = 3 [default = 75000];

  // Max time in a connection without processing anything (in ms).
  // Processing is defined to be "parsing or generating".
  optional int64 idle_timeout = 4 [default = 20000];

  // Default = BatchConfig.tcp_max_connections_per_address.
  optional int64 max_connections_per_address = 5;
}

39.3. Simple Statements

The StmSimple message is used to specify the concrete operations to be executed. Here is the protocol for the StmSimple message. It takes one, and only one, of the different concrete statements outlined above (Echo, Fail, ImportDelim, ExportDelim, etc).

message StmSimple
{
  optional Echo echo = 1;
  optional Fail fail = 2;

  optional ImportDelim import_delim = 3;
  optional ExportDelim export_delim = 4;

  optional S3Upload s3_upload = 5;
  optional S3Download s3_download = 6;

  optional CallProto call_proto = 7;

  optional ClearQueue clear_queue = 8;
  optional ListQueue list_queue = 9;

  optional Noop noop = 10;

  optional Exec exec_stm = 11;

  // number of ftimes to retry this statement if it fails
  optional int64 retry = 100;
}

39.3.1. Echo

The Echo statement prints a string to the standard error stream. This is useful for debugging purposes.

message Echo
{
  required string text = 1;
}

The following example shows how to create a Spec message with a simple Echo statement:

batch = lb.web.batch_pb2.Spec()
stm = batch.stm
stm.simple.echo.text = ‘Print this to stderr!’

39.3.2. CallProto

Below is the protocol defining the CallProto message. The CallProto statement performs a POST operation to a ServiceBlox service, specified in the service parameter, with the contents of the input parameter.

message CallProto
{
  required string service = 1;

  // config is deprecated, use config in Transport
  optional string config = 2;
  optional TransportConfig transport = 7;

  required ProtoBufEncoding encoding = 3;
  optional bool gzip = 4;
  optional Input input = 5;
  optional string output_file = 6;

  // do a best attempt to format the output for human consumption
  optional bool format = 8;

  // the HTTP method to use in the call
  optional HttpMethod method = 9 [default = POST];

  // configure signature-based authentication
  optional SignatureAuth signature = 10;
}

The Input message is defined as below. It can be a sequence of bytes, a string, the contents of a file, or the contents of a resource at a certain URL.

message Input
{
  optional bytes binary = 1;
  optional string text = 2;
  optional string file = 3;
  optional string url = 4;
}

The content of the input resource should be a protobuf message, in either binary or JSON format. This is specified in the encoding parameter.

The response, if any, will be written to the output_file. If the response is a JSON message, the boolean format option will cause an attempt to format the response in a human-readable way.

The following sample Python code creates a CallProto message to be sent to a batch request:

call_proto_stm = stm.simple.call_proto
call_proto_stm.service = ‘multipart-json’
call_proto_stm.simpleencoding = lb.web.batch_pb2.ProtobufEncoding.JSON 
call_proto_stm.input.file = ‘input_file.json’
call_proto_stm.output_file = ‘save-to-this-file.txt’

39.3.3. ImportDelim

The ImportDelim statement uploads a file to a ServiceBlox service using a POST or a PUT HTTP request. Similar to CallProto, the service to connect to is specified via the service parameter, and the input file via the input parameter. In general, this statement should be used to import data through the tabular data exchange services. If the full parameter is true, a PUT request will be issued to to the tabular data exchange service, which will fully replace the existing data with the data provided as input. If the full parameter is set to false, a POST request will be issued to the delimited file service, which will only update the workspace by inserting the content in the input file.

message ImportDelim
{
  required string service = 1;

  // Either data_file or data_url is required
  optional string data_file = 2;
  optional string data_url = 3;
  optional Input input = 8;

  // Config is deprecated, use config in Transport
  optional string config = 4;
  optional TransportConfig transport = 6;

  // Compress the file with gzip before sending, decompress in the server
  optional bool gzip = 5;

  // Default value for full is false, which means POST. Setting full
  // to true will use the PUT method.
  optional bool full = 7;

  // How to return the error records, if desired
  optional string out_file = 9;
  optional string out_url = 10;

  // encryption key to use for out file
  optional string key = 11;

  // whether or not a bad record should abort the transaction
  optional bool abort_on_error = 12;

  // configure signature-based authentication
  optional SignatureAuth signature = 13;

  // whether the tdx import errors flag should be printed to stderr when there are errors
  optional bool print_tdx_import_errors = 14;

  // only actually write to the error file if there's an error (defaults to false for
  // backwards compatibility)
  optional bool error_file_only_on_error = 15 [default = false];
}

The following example shows how to create an ImportDelim statement, posting data from an S3 file to a service hosted at /delim-basic/sales.

import_stm = stm.simple.import_delim
import_stm.service = 'http://host:8080/delim-basic/sales'
import_stm.input.url = ‘s3://testserver.request/delim-basic/more-sales.csv’
import_stm.full = False

39.3.4. ExportDelim

Similar to ImportDelim, ExportDelim performs a GET request to a ServiceBlox service. The data_file and data_url parameters specify where to save the exported data. The data_file parameter should specify a local file path to save to; the data_url parameter should specify an S3 URL. Finally, the key parameter specifies the key to use when encrypting the exported file.

Just as in the case of ImportDelim, the service used with ExportDelim will, in general, be a tabular data exchange service.

message ExportDelim
{
  required string service = 1;
  optional string data_file = 2;
  optional string data_url = 3;

  // encryption key to use for exported file
  optional string key = 4;

  // Config is deprecated, use config in Transport
  optional string config = 5;
  optional TransportConfig transport = 7;

  // Compress the file with gzip in the server, decompress in the client
  optional bool gzip = 6;

  // configure signature-based authentication
  optional SignatureAuth signature = 8;
}

The following is an example of creating an ExportDelim statement requesting data from a service hosted at /delim-basic/sales and saving the output to an S3 file.

export_stm = stm.simple.export_delim
export_stm.service = ‘/delim-basic/sales’
export_stm.data_url = ‘s3://testserver.request/delim-basic/more-sales.csv’

39.3.5. S3Upload

The S3Upload statement can be used to upload a file to S3. The file parameter should specify a file on the local disk, while the url parameter should specify a URL for the destination S3 file. If a key is specified, it will be used to encrypt the file.

 message S3Upload
{
  required string file = 1;
  required string url = 2;
  optional string key = 3;
  optional string config = 4;
}

The following example creates an S3Upload statement in Python:

up_stm = stm.simple.s3_upload
up_stm.file = ‘path-to-local-file’
up_stm.url = ‘s3://amazon.com/path’

39.3.6. S3Download

The S3Download statement can be used to download a file from S3 to local disk. The url parameter should specify a S3 URL for the file to be downloaded, and the file parameter should specify a path in the local disk to save to.

message S3Download
{
  required string file = 1;
  required string url = 2;
  optional string config = 3;
}

The following example creates a S3Download statement in Python:

up_stm = stm.simple.s3_download
up_stm.file = ‘path-to-local-file’
up_stm.url = ‘s3://amazon.com/path’

39.3.7. Exec

The Exec statement executes an external process. The command to be executed should be specified as a sequence of strings, just as the Java Runtime.exec(String[] cmdarray) method.

The timeout parameter specifies, in seconds, how long the batch should wait for the command to terminate. A value of -1 will disable any timeout. If the destroy_on_timeout parameter is set to true, the command will be interrupted upon timeout; if false, the batch will not wait for the command to finish: it will return ERROR, but will not interrupt the process.

The parameters pipe_stdout_to and pipe_stderr_to specify how to redirect standard output and standard error. Both parameters have one of the following values:

  • "-", redirecting output to standard output;

  • a file name, redirecting output to the file;

  • "logger", redirecting output to the log file. In this case, errors will be logged at WARN level, and normal output at INFO level.

message Exec
{
  repeated string command = 1; // command and its arguments
  required int64 timeout = 2; // use -1 to ignore timeout
  optional bool destroy_on_timeout = 3;

  // by default, stdout and stderr will be ignored
  // the following pipe parameters accept a file name,
  // '-' for stdout, and 'logger' to use default logger
  optional string pipe_stdout_to = 4;
  optional string pipe_stderr_to = 5;
}

The following example executes the command cat test and writes the output to a file called file.txt:

exec_stm = stm.simple.exec_stm
exec_stm.command.append(‘cat’)
exec_stm.command.append(‘test’)
exec_stm.pipe_stdout_to = ‘file.txt’
exec_stm.timeout = -1

39.3.8. ClearQueue

The ClearQueue statement clears all messages from a queue. The sqs parameter specifies the information about the endpoints, with the names or URLs for the request and response queues. To clear the request queue, simply set the request parameter to true; to clear the response queue, set the response parameter to true. To not only clear the queues, but also delete them, set the delete parameter to true.

message ClearQueue
{
  // sqs is deprecated
  optional SQSTransportConfig sqs = 1;
  optional bool request = 2;
  optional bool response = 3;
  optional bool delete = 4;
  optional TransportConfig transport = 5;
}

The following shows an example of clearing the request queue in a transport configuration:

clear_stm = stm.simple.clear_queue
clear_stm.sqs.request_queue_name = ‘request-queue’
clear_stm.sqs.response_queue_name = ‘response-queue’
clear_stm.request = True

39.3.9. ListQueue

The ListQueue statement lists the messages on a request or response queue. Just as the ClearQueue statement, the ListQueue statement specifies the queue endpoints with the sqs parameter. The boolean request and response parameters specify which queues should be listed. Finally, the max_messages parameter sets a limit on the number of messages that will be listed. The messages will be printed to standard output.

message ListQueue
{
  // sqs is deprecated
  optional SQSTransportConfig sqs = 1;
  optional bool request = 2;
  optional bool response = 3;
  optional int64 max_messages = 4;
  optional TransportConfig transport = 5;
}

39.3.10. Noop

The Noop statement does not execute anything. It takes no parameters. This is mostly useful when generating batch scripts from code.

39.3.11. Fail

The Fail statement does not execute anything and returns an ERROR status with an error message. The error message is set as the text parameter.

message Fail
{
  required string text = 1;
}

Here is an example of a Fail statement:

fail_stm = stm.simple.fail
fail_stm.text = ‘failure message’

This is useful when using a TryCatch control statement.

39.4. Composite Statements

39.4.1. StmSeq

The StmSeq message takes a list of statements, as shown below. The next statement is started only after the current statement has finished with SUCCESS. Otherwise, execution is interrupted and statements not executed are marked as such (DNS status).

message StmSeq
{
  repeated Stm stm = 1;
}

Using the lb-web-batch Python module, we can create a StmSeq like this:

seq_stm = stm.seq
stm1 = seq_stm.stm.add()
stm1 ... # configure stm1
stm2 = seq_stm.stm.add()
stm2 ... # configure stm2

39.4.2. StmPar

Executes a set of statements asynchronously, in parallel. There are no guarantees about which statement will execute first. Statements that fail will be marked with ERROR or PARTIAL_ERROR, but will not interrupt the execution of other statements.

message StmPar
{
  optional int64 max = 1;
  repeated Stm stm = 2;
}

Creating a StmPar with the Python module is simple:

par_stm = stm.parallel
stm1 = par_stm.stm.add()
stm1 ... # configure stm1
stm2 = par_stm.stm.add()
stm2 ... # configure 

39.4.3. StmTryCatch

Emulates a try/catch control-flow. An attempt is made to execute a try statement. If that statement succeeds, execution is finished with SUCCESS. Otherwise a catch statement is executed, most often to either clean up or work around the first failure. The overall result is then that of the catch statement.

message StmTryCatch
{
  required Stm try = 1;
  required Stm catch = 2;
}

Here is an example of how to create a StmTryCatch:

try_catch_stm = stm.try_catch
try_catch_stm.try_stm … # configure try_stm
try_catch_stm.catch_stm … # configure catch_stm

To catch the exception thrown by the catch statement and pass a more useful error message, we can use the Fail statement:

catch_stm = stm.catch.stm
catch_stm.fail.text = ‘a more pertinent error message’

39.4.4. StmTxn

The StmTxn executes a statement as a transaction, aborting the transaction on ERROR. This requires that all simple statements within the StmTxn must be “abortable”.

Tip

Currently, only the ImportDelim and ExportDelim statements are abortable, so only those statements should be used with StmTxn. Furthermore, all ImportDelim, ExportDelim and transaction services must be within the same workspace.

message StmTxn
{
  required string service = 1;

  // config is deprecated, use config in the transport message
  optional string config = 2;
  optional TransportConfig transport = 5;

  repeated Stm stm = 3;

  // Result
  optional string transaction_id = 4;

  // configure signature-based authentication
  optional SignatureAuth signature = 6;
}

The most important parameter to configure in a StmTxn is the transaction service. This service must be a transaction service created by the delim_txn_service predicate:

service_by_prefix["/multipart/txn/*"] = x,
delim_txn_service(x).

The stm parameter in StmTxn is used to specify the statements within the transaction.

After execution, the transaction_id field will contain the id of this transaction. This can be used to query the ServiceBlox log (lb-web-server.log) for more details on this transaction.

The following example creates a StmTxn with two import statements:

txn_stm = stm.transaction
txn_stm.service = ‘/multipart/txn’

import_delim1 = txn_stm.stm.add().simple.import_delim
import_delim1.service = 'http://host:8080/multipart/sales'
### continue configuring import_delim1

import_delim2 = txn_stm.stm.add().simple.import_delim
import_delim2.service = 'http://host:8080/multipart/returns'
### continue configuring import_delim2

39.5. Writing Protobuf Messages by Hand

While Protobuf messages can be created by using the Python or Java API, the format for Protobuf is easy to write by hand.

These are the rules for writing protobuf messages by hand:

  • each message is encoded as { sequence-of-fields-and-values };

  • if the field is a primitive type, it is encoded as field_name : value ;

  • if the field is a message type, it is encoded as field_name { message_contents } ;

  • the message types are inferred by the definition of the message protocol in the .proto .

The example below shows a message with a batch specification for executing an Exec statement and an Echo statement in sequence:

stm {
    seq {
        stm {
            simple {
                exec_stm {
                    command: “cat”
                    command: “some-file”
                    timeout: 10
                }
            }
        }
        stm {
            simple {
                echo {
                     text: “echo this!”
                }
            }
        }
    }
}