tech.figure.eventstream:es-kafka

A collection of libraries to connect and stream blocks from a node


License
Apache-2.0

Documentation

Event stream client for Provenance blockchain

This is a flow based project to create an event listener on the Provenance blockchain and receive block information.

Status

Latest Release Maven Central Apache 2.0 License Code Coverage LOC

Installation

Maven

<dependencies>
    <dependency>
        <groupId>tech.figure.eventstream</groupId>
        <artifactId>es-core</artifactId>
        <version>${version}</version>
    </dependency>
    <dependency>
        <groupId>tech.figure.eventstream</groupId>
        <artifactId>es-api</artifactId>
        <version>${version}</version>
    </dependency>
    <dependency>
        <groupId>tech.figure.eventstream</groupId>
        <artifactId>es-api-model</artifactId>
        <version>${version}</version>
    </dependency>
</dependencies>

Gradle

Groovy

In build.gradle:

implementation 'tech.figure.eventstream:es-core:${version}'
implementation 'tech.figure.eventstream:es-api:${version}'
implementation 'tech.figure.eventstream:es-api-model:${version}'

Kotlin

In build.gradle.kts:

implementation("tech.figure.eventstream:es-core:${version}")
implementation("tech.figure.eventstream:es-api:${version}")
implementation("tech.figure.eventstream:es-api-model:${version}")

Setup

To get started using the provenance event stream library you need to create an httpAdapter that will create both the rpc client and the websocket client to your query node of choice.

The protocol is required on the host value and can be one of http | https | tcp | tcp+tls.

val host = "https://rpc.test.provenance.io"
val netAdapter = okHttpNetAdapter(host)

With this adapter we can create streams for live data, historical data, metadata, or any combinations.

Usage

Historical Flows

Historical flows require a fromHeight parameter where you want your stream to start.

Optionally, you can add toHeight as an optional parameter. If not supplied the stream will go to current block height.

Get block header flows:

val log = KotlinLogging.logger {}

historicalBlockHeaderFlow(netAdapter, 1, 100)
  .onEach { log.info { "oldHeader: ${it.height}" } }
  .collect()

Get block data flows:

val log = KotlinLogging.logger {}

historicalBlockDataFlow(netAdapter, 1, 100)
  .onEach { log.info { "oldBlock: ${it.height}" } }
  .collect()

Live Flows:

Live flows require an adapter to decode the JSON responses from the chain.

The project includes a moshi adapter configured to decode the RPC responses

Get live block headers:

val log = KotlinLogging.logger {}
val decoderAdapter = moshiDecoderAdapter()

liveBlockHeaderFlow(netAdapter, decoderAdapter)
  .onEach { log.info { "liveHeader: ${it.height}" } }
  .collect()

Get live block datas:

val log = KotlinLogging.logger {}
val decoderAdapter = moshiDecoderAdapter()

liveBlockDataFlow(netAdapter, decoderAdapter)
    .onEach { log.info { "liveBlock: $it" } }
    .collect()

Combinations

These flows can also be combined to create historical + live flows

Get block headers:

val log = KotlinLogging.logger {}

// get the current block height from the node
val current = netAdapter.rpcAdapter.getCurrentHeight()!!
val decoderAdapter = moshiDecoderAdapter()

blockHeaderFlow(netAdapter, decoderAdapter, from = current - 1000, to = current)
    .onEach { log.info { "received: ${it.height}" } }
    .collect()

Get block datas:

val log = KotlinLogging.logger {}

// get the current block height from the node
val current = netAdapter.rpcAdapter.getCurrentHeight()!!
val decoderAdapter = moshiDecoderAdapter()

blockDataFlow(netAdapter, decoderAdapter, from = current - 1000, to = current)
    .onEach { log.info { "received: ${it.height}" } }
    .collect()

Node Subscriptions

We can additionally subscribe to certain events on the node.

Currently, only MessageType.NewBlock and MessageType.NewBlockHeader are supported.

val log = KotlinLogging.logger {}
val decoderAdapter = moshiDecoderAdapter()

nodeEventStream<MessageType.NewBlock>(netAdapter, decoderAdapter)
    .onEach { log.info {"liveBlock: $it" } }
    .collect()