Hyperledger Sawtooth events in Go

This blog post shows how to use the Sawtooth Go SDK for event handling. We can create events in a transaction processor and write a client to subscribe to events via ZMQ. There’s also a web socket based event subscription feature not in scope of the topic discussed here. The article has information useful for developers, technical readers, architects, Sawtooth enthusiasts and anybody interested in blockchain.

Please refer to Sawtooth Go SDK reference for setting up the development environment. It includes information on how to write a client in Go.

Events and Event Subscription In Sawtooth

Thanks to the many open source contributors. The Hyperledger Sawtooth documentation describes in detail about event and event subscription. This article supplements the document. Sawtooth has SDK support in many languages, giving flexibility to the application developer. We will discuss the Go SDK usage in detail.

Sawtooth defines two core events sawtooth/block-commit and sawtooth/state-delta. Both occur when a block is committed. The former carries information about the block. The latter carries information about all state changes at the given address. In addition to the predefined events, Sawtooth allows application developers to define new events. A combination of sophisticated SDK and protobuf definitions abstract the complex network communications. The SDK exposes APIs to add new application-specific events. It also provides the mechanism for clients to subscribe to available events.

Figure 1: Representation of the Validator connection with Transaction Processor and Client via ZMQ.

Event: A savior for real world use cases

  1. Notification events such as state change and block commit help the design of interactive clients. It saves clients from having to poll the validator for the status of submitted batches.
  2. Events can be used  as a debugger for transaction processors. Transaction processors can register application-specific events allowing clients to debug the flow remotely..
  3. In addition to debugging, the asynchronous behavior allows for stateless application designs.
  4. Deferring the processing of events at a later point in time. Sawtooth Supply Chain is one such well known example, which makes use of this feature.
Caution beginners! Do not get carried away with overwhelming application specific use cases. Events are also being used for internal operation of Sawtooth. The horizon is open and our imagination is the only limit.

Adding Application-Specific Event In Go

Creating a custom event involves defining three parts, namely

  1. Define attributes: A list of key-value pairs, which later is used to filter during event subscriptions. There can be many values per attribute key.
import

("github.com/hyperledger/sawtooth-sdk-go/protobuf/events_pb2")

// Create a key-value map to define attribute

attributes := []events_pb2.Event_Attribute{

Key: "MyOwnKey", Value:"ValueIWantToPass",

})

2. Define data payload: Byte information for a specific event type.

payload := []byte{'f', 'u', 'n', '-', 'l', 'e', 'a', 'r', 'n', 'i', 'n', 'g'}

3. Define event type: Used as an identifier when subscribing for the events.

After defining these parameters, we can use the available API in the SDK to create an event and add to the context. Listing 1 shows and example of adding an event. Please note that any change to the context’s events will affect the way subscribed clients work.

import("github.com/hyperledger/sawtooth-sdk-go/processor")

func (self *OneOfMyTPHandler) Apply(

request *processor_pb2.TpProcessRequest,

context *processor.Context)

error {    // -- snip --

context.AddEvent(

"MyEventIdentifier",

attributes,

payload)

// -- snip --
}

Listing 1: Adding an event to the context.

Event-Subscription In Go

Subscribing to an event involves establishing a ZMQ connection with the validator. The event types specify which events are subscribed to. An optional filter attribute can be passed for each event when establishing the subscription. The Sawtooth Go SDK uses protobuf definitions for serializing messages exchanged between the client and the validator. The following four steps show the sample code snippets.

  1. Establish a ZMQ connection: To establish a ZMQ connection from a client as a DEALER. Detailed description can be found in ROUTER-DEALER mechanism in ZMQ. The Sawtooth SDK provides an API for establishing client connection with validator.
import (
"github.com/hyperledger/sawtooth-sdk-go/messaging"
"github.com/pebbe/zmq4"
)
zmq_context, err := zmq4.NewContext()
// Error creating a ZMQ context
if err != nil {
   return err
}

// Remember to replace <VALIDATOR-IP> with hostname
// or the IP where validator is listening on port 4004
zmq_connection, err := messaging.NewConnection(
   zmq_context,
   zmq4.DEALER,
   "tcp://<VALIDATOR-IP>:4004",
   //This specifies a server connection which needs
   //binding or client connection that needs to
   // establish a request to server
   false,
)
// ZMQ connection couldn't be established
if err != nil {
   return err
}
// Remember to close the connection when either done
// processing events processing or an error occursdefer zmq_connection.Close()
// -- snip ––

2. Construct EventFilter, EventSubscription and ClientEventsSubscribeRequest: The Event is said to be subscribed when both event_type and all the EventFilters in EventSubscription field match. EventFilters can be applied on attributes defined earlier. FilterType determines rules for  comparing the match string.

import("github.com/hyperledger/sawtooth-sdk-go/protobuf/events_pb2") 

// Define filters over attributes to be triggered when
// a string matches a particular filter type
filters := []*events_pb2.EventFilter{&events_pb2.EventFilter{   
   Key: "MyOwnKey", 
   MatchString: "MyUniqueString", 
   FilterType:  events_pb2.EventFilter_REGEX_ANY,
}} 
my_identifier_subscription := events_pb2.EventSubscription{   
   EventType: "MyEventIdentifier", 
   Filters: filters,
} 
// -- snip --
// Construct subscription request for the validator
request := client_event_pb2.ClientEventsSubscribeRequest{
   Subscriptions: []*events_pb2.EventSubscription{
     &my_identifier_subscription, 
     &my_another_identifier_subscription,
   },
}
// -- snip ––

3. Send request over ZMQ connection: The client’s event subscription request can be sent through an established ZMQ connection. Note that a correlation id returned by SendMsg() can be used to know if the validator as a ROUTER has response messages. Many events can be subscribed to at once.

import (
 "errors"
"github.com/golang/protobuf/proto"
 "github.com/hyperledger/sawtooth-sdk-go/protobuf/client_event_pb2"
 "github.com/hyperledger/sawtooth-sdk-go/protobuf/validator_pb2"
)
// Get serialized request using protobuf libraries
serialized_subscribe_request, err :=
    proto.Marshal(&request)
if err != nil {
  return err
}
 // Send the subscription request, get a correlation id
// from the SDK
corrId, err := zmq_connection.SendNewMsg(
    validator_pb2.Message_CLIENT_EVENTS_SUBSCRIBE_REQUEST,
    serialized_subscribe_request,
)
// Error requesting validator, optionally based on
// error type may apply retry mechanism here
if err != nil {   return err} // Wait for subscription status, wait for response of
// message with specific correlation id_, response, err :=zmq_connection.RecvMsgWithId(corrId)
if err != nil {
  return err
}
 // Deserialize received protobuf message as response
// for subscription requestevents_subscribe_response :=
    client_event_pb2.ClientEventsSubscribeResponse{}

err = proto.Unmarshal(
    response.Content,
    &events_subscribe_response)
if err != nil {
  return err
}
 // Client subscription is not successful, optional
// retries can be done later for subscription based on
// response cause
if events_subscribe_response.Status !=
  client_event_pb2.ClientEventsSubscribeResponse_OK {
  return errors.New("Client subscription failed")
}
 // Client event subscription is successful, remember to
// unsubscribe when either not required anymore or
// error occurs. Similar approach as followed for
// subscribing events can be used here.
defer func(){
    // Unsubscribe from events
    events_unsubscribe_request :=
      client_event_pb2.ClientEventsUnsubscribeRequest{}
    serialized_unsubscribe_request, err =
      proto.Marshal(&events_unsubscribe_request)
    if err != nil {
      return err
    }
    corrId, err = zmq_connection.SendNewMsg(

      validator_pb2.Message_CLIENT_EVENTS_UNSUBSCRIBE_REQUEST,
      Serialized_unsubscribe_request,
    )
    if err != nil {
      return err
    }
    // Wait for status
    _, unsubscribe_response, err :=
    zmq_connection.RecvMsgWithId(corrId)
    // Optional retries can be done depending on error
    // status
    if err != nil {
      return err
    }
    events_unsubscribe_response :=
      client_event_pb2.ClientEventsUnsubscribeResponse{}
    err =
      proto.Unmarshal(unsubscribe_response.Content, 
      &events_unsubscribe_response)
    if err != nil {
      return err
    }
    // Optional retries can be done depending on error
    // status
    if events_unsubscribe_response.Status !=
      client_event_pb2.ClientEventsUnsubscribeResponse_OK {
        return errors.New("Client couldn't unsubscribe successfully")
    }
}()
// -- snip ––

4. Event handling: The established ZMQ connection will send protobuf messages corresponding to the subscribed events.

import (
    "errors"
    "fmt"
    "github.com/golang/protobuf/proto"
    "github.com/hyperledger/sawtooth-sdk-go/protobuf/validator_pb2"
) 
// Listen for events in an infinite loop
fmt.Println("Listening to events.")
for {
  // Wait for a message on connection 
  _, message, err := zmq_connection.RecvMsg()
  if err != nil {
    return err
  }
  // Check if received is a client event message
  if message.MessageType !=
    validator_pb2.Message_CLIENT_EVENTS {
   return errors.New("Received a message not 
requested for")
  } 
  event_list := events_pb2.EventList{}
  err = proto.Unmarshal(message.Content, &event_list)
  if err != nil {
    return err
  }
  // Received following events from validator   
  for _, event := range event_list.Events {
    // handle event here
    fmt.Printf("Event received: %v\n", *event)
  }
}
// -- snip ––

Try it out!

References:

  1. Subscribing to Events, Using the Go SDK, from the Hyperledger Sawtooth website.
  2. Commits by arsulegai in sawtooth-cookiejar example.
  3. The Sawtooth Go SDK.

Chapter on Router-Dealer, ZMQ protocol.