Subscription
Each Subscription tracks delivery of events to a particular application, and allows FireFly to ensure that messages are delivered reliably to that application.
Creating a subscription¶
Before you can connect to a subscription, you must create it via the REST API.
One special case where you do not need to do this, is Ephemeral WebSocket connections (described below). For these you can just connect and immediately start receiving events.
When creating a new subscription, you give it a name
which is how you will
refer to it when you connect.
You are also able to specify server-side filtering that should be performed against the event stream, to limit the set of events that are sent to your application.
All subscriptions are created within a namespace
, and automatically filter
events to only those emitted within that namespace.
You can create multiple subscriptions for your application, to request different sets of server-side filtering for events. You can then request FireFly to deliver events for both subscriptions over the same WebSocket (if you are using the WebSocket transport). However, delivery order is not assured between two subscriptions.
Subscriptions and workload balancing¶
You can have multiple scaled runtime instances of a single application, all running in parallel. These instances of the application all share a single subscription.
Each event is only delivered once to the subscription, regardless of how many instances of your application connect to FireFly.
With multiple WebSocket connections active on a single subscription, each event might be delivered to different instance of your application. This means workload is balanced across your instances. However, each event still needs to be acknowledged, so delivery processing order can still be maintained within your application database state.
If you have multiple different applications all needing their own copy of the same event, then you need to configure a separate subscription for each application.
Pluggable Transports¶
Hyperledger FireFly has two built-in transports for delivery of events to applications - WebSockets and Webhooks.
The event interface is fully pluggable, so you can extend connectivity over an external event bus - such as NATS, Apache Kafka, Rabbit MQ, Redis etc.
WebSockets¶
If your application has a back-end server runtime, then WebSockets are the most popular option for listening to events. WebSockets are well supported by all popular application development frameworks, and are very firewall friendly for connecting applications into your FireFly server.
Check out the @hyperledger/firefly-sdk SDK for Node.js applications, and the hyperledger/firefly-common module for Golang applications. These both contain reliable WebSocket clients for your event listeners.
A Java SDK is a roadmap item for the community.
WebSocket protocol¶
FireFly has a simple protocol on top of WebSockets:
- Each time you connect/reconnect you need to tell FireFly to start sending you events on a particular subscription. You can do this in two ways (described in detail below):
- Send a WSStart JSON payload
- Include a
namespace
andname
query parameter in the URL when you connect, along with query params for other fields of WSStart - One you have started your subscription, each event flows from the server, to your application as a JSON Event payload
- For each event you receive, you need to send a WSAck payload.
- Unless you specified
autoack
in step (1)
The SDK libraries for FireFly help you ensure you send the
start
payload each time your WebSocket reconnects.
Using start
and ack
explicitly¶
Here's an example websocat command
showing an explicit start
and ack
.
$ websocat ws://localhost:5000/ws
{"type":"start","namespace":"default","name":"docexample"}
# ... for each event that arrives here, you send an ack ...
{"type":"ack","id":"70ed4411-57cf-4ba1-bedb-fe3b4b5fd6b6"}
When creating your subscription, you can set readahead
in order to
ask FireFly to stream a number of messages to your application,
ahead of receiving the acknowledgements.
readahead
can be a powerful tool to increase performance, but does require your application to ensure it processes events in the correct order and sends exactly oneack
for each event.
Auto-starting via URL query and autoack
¶
Here's an example websocat where we use
URL query parameters to avoid the need to send a start
JSON payload.
We also use autoack
so that events just keep flowing from the server.
$ websocat "ws://localhost:5000/ws?namespace=default&name=docexample&autoack"
# ... events just keep arriving here, as the server-side auto-acknowledges
# the events as it delivers them to you.
Note using
autoack
means you can miss events in the case of a disconnection, so should not be used for production applications that require at-least-once delivery.
Ephemeral WebSocket subscriptions¶
FireFly WebSockets provide a special option to create a subscription dynamically, that only lasts for as long as you are connected to the server.
We call these ephemeral
subscriptions.
Here's an example websocat command
showing an an ephemeral subscription - notice we don't specify a name
for the
subscription, and there is no need to have already created the subscription
beforehand.
Here we also include an extra query parameter to set a server-side filter, to only include message events.
$ websocat "ws://localhost:5000/ws?namespace=default&ephemeral&autoack&filter.events=message_.*"
{"type":"start","namespace":"default","name":"docexample"}
# ... for each event that arrives here, you send an ack ...
{"type":"ack","id":"70ed4411-57cf-4ba1-bedb-fe3b4b5fd6b6"}
Ephemeral subscriptions are very convenient for experimentation, debugging and monitoring. However, they do not give reliable delivery because you only receive events that occur while you are connected. If you disconnect and reconnect, you will miss all events that happened while your application was not listening.
Webhooks¶
The Webhook transport allows FireFly to make HTTP calls against your application's API when events matching your subscription are emitted.
This means the direction of network connection is from the FireFly server, to the application (the reverse of WebSockets). Conversely it means you don't need to add any connection management code to your application - just expose and API that FireFly can call to process the events.
Webhooks are great for serverless functions (AWS Lambda etc.), integrations with SaaS applications, and calling existing APIs.
The FireFly configuration options for a Webhook subscription are very flexible, allowing you to customize your HTTP requests as follows:
- Set the HTTP request details:
- Method, URL, query, headers and input body
- Wait for a invocation of the back-end service, before acknowledging
- To retry requests to your Webhook on a non-
2xx
HTTP status code or other error, you should enable and configure options.retry - The event is acknowledged once the request (with any retries), is completed - regardless of whether the outcome was a success or failure.
- Use
fastack
to acknowledge against FireFly immediately and make multiple parallel calls to the HTTP API in a fire-and-forget fashion. - Set the HTTP request details dynamically from
message_confirmed
events: - Map data out of the first
data
element in message events - Requires
withData
to be set on the subscription, in addition to theinput.*
configuration options - Can automatically generate a "reply" message for
message_confirmed
events: - Maps the response body of the HTTP call to data in the reply message
- Sets the
cid
andtopic
in the reply message to match the request - Sets a
tag
in the reply message, per the configuration, or dynamically based on a field in the input request data.
Batching events¶
Webhooks have the ability to batch events into a single HTTP request instead of sending an event per HTTP request. The interface will be a JSON array of events instead of a top level JSON object with a single event. The size of the batch will be set by the readAhead
limit and an optional timeout can be specified to send the events when the batch hasn't filled.
To enable this set the following configuration under SubscriptionOptions
batch
| Events are delivered in batches in an ordered array. The batch size is capped to the readAhead limit. The event payload is always an array even if there is a single event in the batch. Commonly used with Webhooks to allow events to be delivered and acknowledged in batches. | bool
|
batchTimeout
| When batching is enabled, the optional timeout to send events even when the batch hasn't filled. Defaults to 2 seconds | string
NOTE: When batch is enabled, withData
cannot be used as these may alter the HTTP request based on a single event and in batching it does not make sense for now.
Example¶
{
"id": "c38d69fd-442e-4d6f-b5a4-bab1411c7fe8",
"namespace": "ns1",
"name": "app1",
"transport": "websockets",
"filter": {
"events": "^(message_.*|token_.*)$",
"message": {
"tag": "^(red|blue)$"
},
"transaction": {},
"blockchainevent": {}
},
"options": {
"firstEvent": "newest",
"readAhead": 50
},
"created": "2022-05-16T01:23:15Z",
"updated": null
}
Field Descriptions¶
Field Name | Description | Type |
---|---|---|
id |
The UUID of the subscription | UUID |
namespace |
The namespace of the subscription. A subscription will only receive events generated in the namespace of the subscription | string |
name |
The name of the subscription. The application specifies this name when it connects, in order to attach to the subscription and receive events that arrived while it was disconnected. If multiple apps connect to the same subscription, events are workload balanced across the connected application instances | string |
transport |
The transport plugin responsible for event delivery (WebSockets, Webhooks, JMS, NATS etc.) | string |
filter |
Server-side filter to apply to events | SubscriptionFilter |
options |
Subscription options | SubscriptionOptions |
ephemeral |
Ephemeral subscriptions only exist as long as the application is connected, and as such will miss events that occur while the application is disconnected, and cannot be created administratively. You can create one over over a connected WebSocket connection | bool |
created |
Creation time of the subscription | FFTime |
updated |
Last time the subscription was updated | FFTime |
SubscriptionFilter¶
Field Name | Description | Type |
---|---|---|
events |
Regular expression to apply to the event type, to subscribe to a subset of event types | string |
message |
Filters specific to message events. If an event is not a message event, these filters are ignored | MessageFilter |
transaction |
Filters specific to events with a transaction. If an event is not associated with a transaction, this filter is ignored | TransactionFilter |
blockchainevent |
Filters specific to blockchain events. If an event is not a blockchain event, these filters are ignored | BlockchainEventFilter |
topic |
Regular expression to apply to the topic of the event, to subscribe to a subset of topics. Note for messages sent with multiple topics, a separate event is emitted for each topic | string |
topics |
Deprecated: Please use 'topic' instead | string |
tag |
Deprecated: Please use 'message.tag' instead | string |
group |
Deprecated: Please use 'message.group' instead | string |
author |
Deprecated: Please use 'message.author' instead | string |
MessageFilter¶
Field Name | Description | Type |
---|---|---|
tag |
Regular expression to apply to the message 'header.tag' field | string |
group |
Regular expression to apply to the message 'header.group' field | string |
author |
Regular expression to apply to the message 'header.author' field | string |
TransactionFilter¶
Field Name | Description | Type |
---|---|---|
type |
Regular expression to apply to the transaction 'type' field | string |
BlockchainEventFilter¶
Field Name | Description | Type |
---|---|---|
name |
Regular expression to apply to the blockchain event 'name' field, which is the name of the event in the underlying blockchain smart contract | string |
listener |
Regular expression to apply to the blockchain event 'listener' field, which is the UUID of the event listener. So you can restrict your subscription to certain blockchain listeners. Alternatively to avoid your application need to know listener UUIDs you can set the 'topic' field of blockchain event listeners, and use a topic filter on your subscriptions | string |
SubscriptionOptions¶
Field Name | Description | Type |
---|---|---|
firstEvent |
Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest' | SubOptsFirstEvent |
readAhead |
The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | uint16 |
withData |
Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports. | bool |
batch |
Events are delivered in batches in an ordered array. The batch size is capped to the readAhead limit. The event payload is always an array even if there is a single event in the batch, allowing client-side optimizations when processing the events in a group. Available for both Webhooks and WebSockets. | bool |
batchTimeout |
When batching is enabled, the optional timeout to send events even when the batch hasn't filled. | string |
fastack |
Webhooks only: When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations | bool |
url |
Webhooks only: HTTP url to invoke. Can be relative if a base URL is set in the webhook plugin config | string |
method |
Webhooks only: HTTP method to invoke. Default=POST | string |
json |
Webhooks only: Whether to assume the response body is JSON, regardless of the returned Content-Type | bool |
reply |
Webhooks only: Whether to automatically send a reply event, using the body returned by the webhook | bool |
replytag |
Webhooks only: The tag to set on the reply message | string |
replytx |
Webhooks only: The transaction type to set on the reply message | string |
headers |
Webhooks only: Static headers to set on the webhook request | `` |
query |
Webhooks only: Static query params to set on the webhook request | `` |
tlsConfigName |
The name of an existing TLS configuration associated to the namespace to use | string |
input |
Webhooks only: A set of options to extract data from the first JSON input data in the incoming message. Only applies if withData=true | WebhookInputOptions |
retry |
Webhooks only: a set of options for retrying the webhook call | WebhookRetryOptions |
httpOptions |
Webhooks only: a set of options for HTTP | WebhookHTTPOptions |
WebhookInputOptions¶
Field Name | Description | Type |
---|---|---|
query |
A top-level property of the first data input, to use for query parameters | string |
headers |
A top-level property of the first data input, to use for headers | string |
body |
A top-level property of the first data input, to use for the request body. Default is the whole first body | string |
path |
A top-level property of the first data input, to use for a path to append with escaping to the webhook path | string |
replytx |
A top-level property of the first data input, to use to dynamically set whether to pin the response (so the requester can choose) | string |
WebhookRetryOptions¶
Field Name | Description | Type |
---|---|---|
enabled |
Enables retry on HTTP calls, defaults to false | bool |
count |
Number of times to retry the webhook call in case of failure | int |
initialDelay |
Initial delay between retries when we retry the webhook call | string |
maxDelay |
Max delay between retries when we retry the webhookcall | string |
WebhookHTTPOptions¶
Field Name | Description | Type |
---|---|---|
proxyURL |
HTTP proxy URL to use for outbound requests to the webhook | string |
tlsHandshakeTimeout |
The max duration to hold a TLS handshake alive | string |
requestTimeout |
The max duration to hold a TLS handshake alive | string |
maxIdleConns |
The max number of idle connections to hold pooled | int |
idleTimeout |
The max duration to hold a HTTP keepalive connection between calls | string |
connectionTimeout |
The maximum amount of time that a connection is allowed to remain with no data transmitted. | string |
expectContinueTimeout |
See ExpectContinueTimeout in the Go docs | string |