Skip to content

Subscription

Each Subscription tracks delivery of events to a particular application, and allows FireFly to ensure that messages are delivered reliably to that application.

FireFly Event Subscription Model

Creating a subscription

Before you can connect to a subscription, you must create it via the REST API.

One special case where you do not need to do this, is Ephemeral WebSocket connections (described below). For these you can just connect and immediately start receiving events.

When creating a new subscription, you give it a name which is how you will refer to it when you connect.

You are also able to specify server-side filtering that should be performed against the event stream, to limit the set of events that are sent to your application.

All subscriptions are created within a namespace, and automatically filter events to only those emitted within that namespace.

You can create multiple subscriptions for your application, to request different sets of server-side filtering for events. You can then request FireFly to deliver events for both subscriptions over the same WebSocket (if you are using the WebSocket transport). However, delivery order is not assured between two subscriptions.

Subscriptions and workload balancing

You can have multiple scaled runtime instances of a single application, all running in parallel. These instances of the application all share a single subscription.

Each event is only delivered once to the subscription, regardless of how many instances of your application connect to FireFly.

With multiple WebSocket connections active on a single subscription, each event might be delivered to different instance of your application. This means workload is balanced across your instances. However, each event still needs to be acknowledged, so delivery processing order can still be maintained within your application database state.

If you have multiple different applications all needing their own copy of the same event, then you need to configure a separate subscription for each application.

Pluggable Transports

Hyperledger FireFly has two built-in transports for delivery of events to applications - WebSockets and Webhooks.

The event interface is fully pluggable, so you can extend connectivity over an external event bus - such as NATS, Apache Kafka, Rabbit MQ, Redis etc.

WebSockets

If your application has a back-end server runtime, then WebSockets are the most popular option for listening to events. WebSockets are well supported by all popular application development frameworks, and are very firewall friendly for connecting applications into your FireFly server.

Check out the @hyperledger/firefly-sdk SDK for Node.js applications, and the hyperledger/firefly-common module for Golang applications. These both contain reliable WebSocket clients for your event listeners.

A Java SDK is a roadmap item for the community.

WebSocket protocol

FireFly has a simple protocol on top of WebSockets:

  1. Each time you connect/reconnect you need to tell FireFly to start sending you events on a particular subscription. You can do this in two ways (described in detail below):
  2. Send a WSStart JSON payload
  3. Include a namespace and name query parameter in the URL when you connect, along with query params for other fields of WSStart
  4. One you have started your subscription, each event flows from the server, to your application as a JSON Event payload
  5. For each event you receive, you need to send a WSAck payload.
  6. Unless you specified autoack in step (1)

The SDK libraries for FireFly help you ensure you send the start payload each time your WebSocket reconnects.

Using start and ack explicitly

Here's an example websocat command showing an explicit start and ack.

$ websocat ws://localhost:5000/ws
{"type":"start","namespace":"default","name":"docexample"}
# ... for each event that arrives here, you send an ack ...
{"type":"ack","id":"70ed4411-57cf-4ba1-bedb-fe3b4b5fd6b6"}

When creating your subscription, you can set readahead in order to ask FireFly to stream a number of messages to your application, ahead of receiving the acknowledgements.

readahead can be a powerful tool to increase performance, but does require your application to ensure it processes events in the correct order and sends exactly one ack for each event.

Auto-starting via URL query and autoack

Here's an example websocat where we use URL query parameters to avoid the need to send a start JSON payload.

We also use autoack so that events just keep flowing from the server.

$ websocat "ws://localhost:5000/ws?namespace=default&name=docexample&autoack"
# ... events just keep arriving here, as the server-side auto-acknowledges
#     the events as it delivers them to you.

Note using autoack means you can miss events in the case of a disconnection, so should not be used for production applications that require at-least-once delivery.

Ephemeral WebSocket subscriptions

FireFly WebSockets provide a special option to create a subscription dynamically, that only lasts for as long as you are connected to the server.

We call these ephemeral subscriptions.

Here's an example websocat command showing an an ephemeral subscription - notice we don't specify a name for the subscription, and there is no need to have already created the subscription beforehand.

Here we also include an extra query parameter to set a server-side filter, to only include message events.

$ websocat "ws://localhost:5000/ws?namespace=default&ephemeral&autoack&filter.events=message_.*"
{"type":"start","namespace":"default","name":"docexample"}
# ... for each event that arrives here, you send an ack ...
{"type":"ack","id":"70ed4411-57cf-4ba1-bedb-fe3b4b5fd6b6"}

Ephemeral subscriptions are very convenient for experimentation, debugging and monitoring. However, they do not give reliable delivery because you only receive events that occur while you are connected. If you disconnect and reconnect, you will miss all events that happened while your application was not listening.

Webhooks

The Webhook transport allows FireFly to make HTTP calls against your application's API when events matching your subscription are emitted.

This means the direction of network connection is from the FireFly server, to the application (the reverse of WebSockets). Conversely it means you don't need to add any connection management code to your application - just expose and API that FireFly can call to process the events.

Webhooks are great for serverless functions (AWS Lambda etc.), integrations with SaaS applications, and calling existing APIs.

The FireFly configuration options for a Webhook subscription are very flexible, allowing you to customize your HTTP requests as follows:

  • Set the HTTP request details:
  • Method, URL, query, headers and input body
  • Wait for a invocation of the back-end service, before acknowledging
  • To retry requests to your Webhook on a non-2xx HTTP status code or other error, you should enable and configure options.retry
  • The event is acknowledged once the request (with any retries), is completed - regardless of whether the outcome was a success or failure.
  • Use fastack to acknowledge against FireFly immediately and make multiple parallel calls to the HTTP API in a fire-and-forget fashion.
  • Set the HTTP request details dynamically from message_confirmed events:
  • Map data out of the first data element in message events
  • Requires withData to be set on the subscription, in addition to the input.* configuration options
  • Can automatically generate a "reply" message for message_confirmed events:
  • Maps the response body of the HTTP call to data in the reply message
  • Sets the cid and topic in the reply message to match the request
  • Sets a tag in the reply message, per the configuration, or dynamically based on a field in the input request data.

Batching events

Webhooks have the ability to batch events into a single HTTP request instead of sending an event per HTTP request. The interface will be a JSON array of events instead of a top level JSON object with a single event. The size of the batch will be set by the readAhead limit and an optional timeout can be specified to send the events when the batch hasn't filled.

To enable this set the following configuration under SubscriptionOptions

batch | Events are delivered in batches in an ordered array. The batch size is capped to the readAhead limit. The event payload is always an array even if there is a single event in the batch. Commonly used with Webhooks to allow events to be delivered and acknowledged in batches. | bool |

batchTimeout | When batching is enabled, the optional timeout to send events even when the batch hasn't filled. Defaults to 2 seconds | string

NOTE: When batch is enabled, withData cannot be used as these may alter the HTTP request based on a single event and in batching it does not make sense for now.

Example

{
    "id": "c38d69fd-442e-4d6f-b5a4-bab1411c7fe8",
    "namespace": "ns1",
    "name": "app1",
    "transport": "websockets",
    "filter": {
        "events": "^(message_.*|token_.*)$",
        "message": {
            "tag": "^(red|blue)$"
        },
        "transaction": {},
        "blockchainevent": {}
    },
    "options": {
        "firstEvent": "newest",
        "readAhead": 50
    },
    "created": "2022-05-16T01:23:15Z",
    "updated": null
}

Field Descriptions

Field Name Description Type
id The UUID of the subscription UUID
namespace The namespace of the subscription. A subscription will only receive events generated in the namespace of the subscription string
name The name of the subscription. The application specifies this name when it connects, in order to attach to the subscription and receive events that arrived while it was disconnected. If multiple apps connect to the same subscription, events are workload balanced across the connected application instances string
transport The transport plugin responsible for event delivery (WebSockets, Webhooks, JMS, NATS etc.) string
filter Server-side filter to apply to events SubscriptionFilter
options Subscription options SubscriptionOptions
ephemeral Ephemeral subscriptions only exist as long as the application is connected, and as such will miss events that occur while the application is disconnected, and cannot be created administratively. You can create one over over a connected WebSocket connection bool
created Creation time of the subscription FFTime
updated Last time the subscription was updated FFTime

SubscriptionFilter

Field Name Description Type
events Regular expression to apply to the event type, to subscribe to a subset of event types string
message Filters specific to message events. If an event is not a message event, these filters are ignored MessageFilter
transaction Filters specific to events with a transaction. If an event is not associated with a transaction, this filter is ignored TransactionFilter
blockchainevent Filters specific to blockchain events. If an event is not a blockchain event, these filters are ignored BlockchainEventFilter
topic Regular expression to apply to the topic of the event, to subscribe to a subset of topics. Note for messages sent with multiple topics, a separate event is emitted for each topic string
topics Deprecated: Please use 'topic' instead string
tag Deprecated: Please use 'message.tag' instead string
group Deprecated: Please use 'message.group' instead string
author Deprecated: Please use 'message.author' instead string

MessageFilter

Field Name Description Type
tag Regular expression to apply to the message 'header.tag' field string
group Regular expression to apply to the message 'header.group' field string
author Regular expression to apply to the message 'header.author' field string

TransactionFilter

Field Name Description Type
type Regular expression to apply to the transaction 'type' field string

BlockchainEventFilter

Field Name Description Type
name Regular expression to apply to the blockchain event 'name' field, which is the name of the event in the underlying blockchain smart contract string
listener Regular expression to apply to the blockchain event 'listener' field, which is the UUID of the event listener. So you can restrict your subscription to certain blockchain listeners. Alternatively to avoid your application need to know listener UUIDs you can set the 'topic' field of blockchain event listeners, and use a topic filter on your subscriptions string

SubscriptionOptions

Field Name Description Type
firstEvent Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest' SubOptsFirstEvent
readAhead The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts uint16
withData Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports. bool
batch Events are delivered in batches in an ordered array. The batch size is capped to the readAhead limit. The event payload is always an array even if there is a single event in the batch, allowing client-side optimizations when processing the events in a group. Available for both Webhooks and WebSockets. bool
batchTimeout When batching is enabled, the optional timeout to send events even when the batch hasn't filled. string
fastack Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations bool
url Webhooks only: HTTP url to invoke. Can be relative if a base URL is set in the webhook plugin config string
method Webhooks only: HTTP method to invoke. Default=POST string
json Webhooks only: Whether to assume the response body is JSON, regardless of the returned Content-Type bool
reply Webhooks only: Whether to automatically send a reply event, using the body returned by the webhook bool
replytag Webhooks only: The tag to set on the reply message string
replytx Webhooks only: The transaction type to set on the reply message string
headers Webhooks only: Static headers to set on the webhook request ``
query Webhooks only: Static query params to set on the webhook request ``
tlsConfigName The name of an existing TLS configuration associated to the namespace to use string
input Webhooks only: A set of options to extract data from the first JSON input data in the incoming message. Only applies if withData=true WebhookInputOptions
retry Webhooks only: a set of options for retrying the webhook call WebhookRetryOptions
httpOptions Webhooks only: a set of options for HTTP WebhookHTTPOptions

WebhookInputOptions

Field Name Description Type
query A top-level property of the first data input, to use for query parameters string
headers A top-level property of the first data input, to use for headers string
body A top-level property of the first data input, to use for the request body. Default is the whole first body string
path A top-level property of the first data input, to use for a path to append with escaping to the webhook path string
replytx A top-level property of the first data input, to use to dynamically set whether to pin the response (so the requester can choose) string

WebhookRetryOptions

Field Name Description Type
enabled Enables retry on HTTP calls, defaults to false bool
count Number of times to retry the webhook call in case of failure int
initialDelay Initial delay between retries when we retry the webhook call string
maxDelay Max delay between retries when we retry the webhookcall string

WebhookHTTPOptions

Field Name Description Type
proxyURL HTTP proxy URL to use for outbound requests to the webhook string
tlsHandshakeTimeout The max duration to hold a TLS handshake alive string
requestTimeout The max duration to hold a TLS handshake alive string
maxIdleConns The max number of idle connections to hold pooled int
idleTimeout The max duration to hold a HTTP keepalive connection between calls string
connectionTimeout The maximum amount of time that a connection is allowed to remain with no data transmitted. string
expectContinueTimeout See ExpectContinueTimeout in the Go docs string