Chapter 28. Protobuf and Global Protobuf Services

28.1. Implementing ProtoBuf/JSON Services

This section explains how to implement protocol buffer and JSON services in ServiceBlox. We illustrate this with a simple service that, given the name of a timezone, returns the current time in that timezone.

The first step in the implementation of a service is the definition of the protocol used between the client and the service. This protocol serves as documentation of the service, but can also be used to generate source code artifacts used by the client and the service. The protocol is specified as a protobuf schema (see the protobuf language guide for a detailed reference on protocol specifications).

For the time service, this protocol is:

package time;

message Request
{
  required string timezone = 1;
}

message Response
{
  optional string answer = 1;
  optional string error = 2;
} 

In JSON syntax, a request for the time in UTC is {"timezone" : "UTC"}. At the time of writing, the answer in JSON syntax would have been {"answer": "2012/11/13 00:19 +00:00"}.

Next, it is time to write the LogiQL rules for the actual implementation of the service. To use protobuf messages in LogiQL, a LogiQL schema must be generated from the protocol. Usually this is taken care of by the build system of a project (see Chapter 24, Hierarchical Import/Export). If we ignore some compiler directives, the generated LogiQL schema for the time protocol looks like this:

time:Request(x), time:RequestId(x:id) -> int(id).
time:Request:timezone[x] = y -> time:Request(x), string(y).

time:Response(x), time:ResponseId(x:id) -> int(id).
time:Response:answer[x] = y -> time:Response(x), string(y).
time:Response:error[x] = y -> time:Response(x), string(y).

When the ServiceBlox service container receives an HTTP request for a service, the server imports the protobuf message contained in the body of the HTTP request into the workspace that hosts the service. This request message is typically a pulse entity, which means that it does not persist in the workspace after the transaction. The import of the example request for the current time in UTC is equivalent to executing the following logic (we are using the syntax described in Chapter 22, Hierarchical Syntax):

+time:Request(_) {
  +time:Request:timezone[] = "UTC"
}.

The service implementation consists of delta rules that trigger when a request entity element is created. To respond to the request, the delta rules create a message in the response protocol, which is then exported from the workspace by the ServiceBlox service container. This all happens in a single transaction. The server returns the response to the client in the body of an HTTP message. For the UTC example, the delta logic to create the literal response message would be:

+time:Response(_) {
  +time:Response:answer[] = "2012/11/13 00:19 +00:00"
}.

Of course, the actual implementation should trigger from the actual request, and also consider the current time. One common complication in the implementation of the service is that the logic must make sure to always return a response. To guarantee this, it is useful to introduce separate, intermediate predicates for the result of the request. In the following example, we introduced an answer predicate for this purpose. The first rule computes the answer for the given timezone request. The second rule populates a successful response, while the third rule generates an error message if no answer could be computed.

block(`time) {

  clauses(`{

    answer[req] = s -> time:Request(req), string(s).
    lang:pulse(`answer).

    // determine the answer for the requested timezone
    +answer[req] = s
      <-
      +time:Request:timezone[req] = tz,
      datetime:now[] = dt,
      datetime:formatTZ[dt, "%Y/%m/%d %H:%M %Q", tz] = s.

    // use constructor for creating a response message
    lang:constructor(`cons).
    lang:pulse(`cons).
    cons[req] = resp -> time:Request(req), time:Response(resp).

    // create the response message from the answer
    +cons[req] = resp,
    +time:Response(resp),
    +time:Response:answer[resp] = s
      <-
      +answer[req] = s.

    // create the error response message if there is no answer
    +cons[req] = resp,
    +time:Response(resp),
    +time:Response:error[resp] = "not a valid timezone: " + tz
      <-
      +time:Request:timezone[req] = tz,
      !+answer[req] = _.

  })

} <-- . 

ServiceBlox finds services to host by scanning workspaces for service configurations. A workspace can host an arbitrary number of services, each of which is defined by a service entity. For the timezone service, the configuration uses the subtype of service for protobuf services, called default_protobuf_service.

Note

The example below does not specify an HTTP method. If no method is specified (by service_by_prefix_and_method) then protobuf services will default to only answering HTTP POST requests.
block(`service_config) {

  alias_all(`lb:web:config:service),
  alias_all(`lb:web:config:service_abbr),
  alias_all(`lb:web:config:protobuf),
  alias_all(`lb:web:config:protobuf_abbr),

  clauses(`{

    service_by_prefix["/time"] = x,
    default_protobuf_service(x) {
      protobuf_protocol[] = "time",
      protobuf_request_message[] = "Request",
      protobuf_response_message[] = "Response"
    }.

  })

} <-- . 

28.1.1. Implementing services in inactive blocks

The implementation of the time service in the previous example was in an active block of logic, which means those rules will get triggered whenever a Request message is imported into the workspace, regardless of the service which imported this message. This may be a problem if you want to implement several services which share a Request message but interpret it differently.

This issue can be solved by implementing the logic which interprets the request in an inactive block which will only be executed for the specified service.

As an example let us imagine we want to implement a service which always returns the time in the EST timezone when no timezone is specified in the request. We do it in this block:

block(`est_time) {
  inactive(),
  clauses(`{

    answer[req] = s -> time:OptionalRequest(req), string(s).
    lang:pulse(`answer).

    // determine the answer for the requested timezone
    +answer[req] = s
      <-
      +time:OptionalRequest_timezone[req] = tz,
      datetime:now[] = dt,
      datetime:formatTZ[dt, "%Y/%m/%d %H:%M %Q", tz] = s.

    +answer[req] = s
      <-
      !+time:OptionalRequest_timezone[req] = _,
      +time:OptionalRequest(req),
      datetime:now[] = dt,
      datetime:formatTZ[dt, "%Y/%m/%d %H:%M %Q", "EST"] = s.

    // use constructor for creating a response message
    lang:constructor(`cons).
    lang:pulse(`cons).
    cons[req] = resp -> time:OptionalRequest(req), time:Response(resp).

    // create the response message from the answer
    +cons[req] = resp,
    +time:Response(resp),
    +time:Response_answer[resp] = s
      <-
      +answer[req] = s.


  })

} <-- . 

and we can configure a service to use this block with:

/**
 * Service that reports the current time in a certain
 * timezone or EST when no timezone is specified.
 */
service_by_prefix["/est-time"] = x,
default_protobuf_service(x) {
  inactive_block_name[] = "sample:est_time",
  protobuf_protocol[] = "time",
  protobuf_request_message[] = "OptionalRequest",
  protobuf_response_message[] = "Response"
}. 

This approach will allow us to implement several services using the same request format with implementations that would otherwise conflict. However, this also means that the code in these inactive blocks cannot be reused between services. Deciding whether to implement a service in active logic or inactive logic is a tradeoff between reusing code and minimizing coupling between services.

Similarly, services can use inactive blocks executed after maintenance has reached a fixpoint (see Section 21.4.1, “Maintenance”), as in this example.

    default_protobuf_service(x) {
      inactive_after_fixpoint_block_name[] = "sample:est_time_afp",
      protobuf_protocol[] = "time",
      protobuf_request_message[] = "OptionalRequest",
      protobuf_response_message[] = "Response"
    }.

28.1.2. Writing Automated Tests using Python

ServiceBlox ProtoBuf/JSON services are standard HTTP services, so in principle any HTTP service testing tool can be used. ServiceBlox comes with a small Python library of convenient abstractions to invoke services, and we recommend writing automated tests using this library.

  • The lb.web.admin.Client class allows access to the admin services of ServiceBlox. This can be convenient to isolate services from testsuites.

  • The lb.web.service.Client class allows a ProtoBuf request to be built and sent to the service. It dynamically builds the required Python classes from the descriptor that it fetches from the ServiceBlox admin services.

The Python client sends and receives binary protobufs by default. It can be used to test services with BINARY or AUTO encoding. JSON is supported at a lower level.

A simple Python testsuite needs the following imports:

#! /usr/bin/env python

import sys
import os
import unittest

sys.path.insert(0, '%s/lib/python' % os.environ.get('LOGICBLOX_HOME'))
sys.path.insert(0, '%s/lib/python' % os.environ.get('LB_WEBSERVER_HOME'))

import lb.web.testcase
import lb.web.service
import lb.web.admin

There are two main testcase class: lb.web.testcase.PrototypeWorkspaceTestCase and lb.web.testcase.TestCase. We generally recommend using the prototype workspace testcase, because it prevents interference between different tests. For truly stateless services, the simple TestCase class can be used and will be significantly faster.

A simple testsuite for the time service:

class TestTimeService(lb.web.testcase.PrototypeWorkspaceTestCase):

    prototype = "/workspace-name"

    def setUp(self):
        super(TestTimeService, self).setUp()
        self.client = lb.web.service.Client("localhost", 8080, "/time")

    def test_utc(self):
        req = self.client.dynamic_request()
        req.timezone = "UTC"
        response = self.client.dynamic_call(req)
        self.assertHasField(response, "answer") 

The lb.web.service.Client class also provides support for testing authenticated services. Notice that the cookie jar must be manually assigned to the service that requires authentication:

import lb.web.credentials

class AuthenticatedTestTimeService(lb.web.testcase.PrototypeWorkspaceTestCase):

    prototype = "/workspace-name"

    def setUp(self):
        super(AuthenticatedTestTimeService, self).setUp()
        self.client = lb.web.service.Client("localhost", 8080, "/atime")
        self.login_client = lb.web.service.Client("localhost", 8080, "/login")
        self.client.jar = self.login_client.jar

    def test_login_works(self):
        credentials_client = lb.web.credentials.Client()
        credentials_client.set_password("user", "password")
        self.login_client.login("user", "password", "time_auth")

        req = self.client.dynamic_request()
        req.timezone = "EST"
        response = self.client.dynamic_call(req)
        self.assertHasField(response, "answer") 

28.2. Implementing Global ProtoBuf/JSON Services

Global protobuf services support broadcasting a request to a set of other services (usually partitions of a partitioned database) and combining the results of the individual services into a single response of the global service.

As an example, we will use a database of products. Product data in retail planning applications is typically partitioned by product category, which means that it might not be possible to easily find all products that satisfy properties not related to product categories. Yet an application might have to support a search facility that allows us to find, for example, all products whose price is not below a given minimum.

The database schema for products:

block(`schema) {

  export(`{

    product(x), product_id(x:s) -> string(s).
    product_price[x] = v -> product(x), int(v).

  })

} <-- . 

The following protocol of the global service has a minimum price field on the request, and returns a list of products (as signified by the keyword repeated). The list of results is important here: the generic global protobuf service by default concatenates all results, which works particularly well for search services (less so for global aggregation services).

message SearchRequest
{
  required uint32 min_price = 1;
}

message SearchResponse
{
  repeated Product product = 1;
}

message Product
{
  required string description = 1;
  required uint32 price = 2;
} 

The services on the individual partitions of the distributed system can use the same protocol. The implementation of the local service is fairly straightforward. The first rule finds all products that match the search criteria, and creates protobuf Product messages for these products, collecting them in results. The second rule creates responses.

block(`search) {

  alias_all(`schema),

  clauses(`{

    lang:pulse(`results).
    results(p, req) -> Product(p), SearchRequest(req).

    lang:constructor(`cons).
    lang:pulse(`cons).
    cons[req] = resp -> SearchRequest(req), SearchResponse(resp).

    +results(x, req),
    +Product(x),
    +Product:description[x] = s,
    +Product:price[x] = actual
      <-
      +SearchRequest:min_price[req] = v,
      product_price[p] = actual,
      actual >= v,
      product_id[p] = s.

    +cons[req] = resp,
    +SearchResponse(resp),
    +SearchResponse:product[resp, i] = p
      <-
      +SearchRequest:min_price[req] = v,
      +results(p, req),
      +ProductId[p] = i.
  })

} <-- . 

The configuration of the global service is a bit more involved, because it specifies what services to target. The services to target are specified by URLs.

block(`service_config) {

  alias_all(`lb:web:config:service),
  alias_all(`lb:web:config:service_abbr),
  alias_all(`lb:web:config:protobuf),
  alias_all(`lb:web:config:protobuf_abbr),
  alias_all(`lb:web:config:global_protobuf),
  alias_all(`lb:web:config:global_protobuf_abbr),

  clauses(`{

    service_by_prefix["/protobuf-global-search/search"] = x,
    global_protobuf_service(x) {
      protobuf_protocol[] = "search",
      protobuf_request_message[] = "SearchRequest",
      protobuf_response_message[] = "SearchResponse",

      global_protobuf_target_uri("http://localhost:8080/protobuf-global-search/partition/1"),
      global_protobuf_target_uri("http://localhost:8080/protobuf-global-search/partition/2")
    }.

  })
} <--. 

As an example, the following log illustrates a search across two partitions on a database of highly rated products on Amazon.

$ echo '{"min_price" : 30}' | lb web-client call 'http://localhost:8080/search'
-----------------  request (/search) -----------------
min_price: 30

----------------- response (/partition/1) -----------------
product { description: "Food Thermometer"         price: 97 }
product { description: "Gluten-free Pancake Mix"  price: 41 }
product { description: "Forehead Flashlight"      price: 32 }

----------------- response (/partition/2) -----------------
product { description: "Three Wolf Moon T-Shirt"  price: 35 }
product { description: "Portable Gas Grill"       price: 134 }

------------------- response (/search) --------------------
product { description: "Food Thermometer"         price: 97 }
product { description: "Gluten-free Pancake Mix"  price: 41 }
product { description: "Forehead Flashlight"      price: 32 }
product { description: "Three Wolf Moon T-Shirt"  price: 35 }
product { description: "Portable Gas Grill"       price: 134 }

Complete executable examples of global protobuf services are available in the lb-web-samples package (see protobuf-global-*).