About Streaming

Background

Streaming refers to a continuous, ordered transmission of data. Space and Time provides an easy way for users to set up their own streams and a flexible means to configure how the data should be ingested.

The Space and Time Streaming solution is composed of three primary components:

  • Apache Kafka
  • Streaming API
  • Data Ingestion

Apache Kafka

The core engine of Space and Time streaming is Apache Kafka, an open-source solution well known and widely used for its high throughput, scalability, and client libraries in practically every programming language. For more information about Apache Kafka, see the official website.

Space and Time pairs Kafka's out-of-the-box authentication and authorization solutions with its own to enable secure configuration of the underlying resources within Kafka while maintaining compatibility with existing client libraries. For example, at configuration time, you can submit an API request to create a new topic and must provide an access token from prior authentication as well as a biscuit authorizing the instantiation of new infrastructure. At runtime, however, you can authenticate with Kafka and produce new data using existing Kafka libraries, without needing to implement additional logic.

Configure your Kafka client with the following properties:

  • security.protocol=SASL_SSL
  • sasl.mechanism=SCRAM-SHA-512

Streaming API

Space and Time provides a suite of APIs to facilitate streaming operations. These APIs are primarily intended to enable configuration and management of Kafka infrastructure, but a simple REST API endpoint for producing data is also provided. See the API specs here.

Data Ingestion

Space and Time consumes data from user topics and applies it to the target tables. The data ingestion service is controlled by a flexible data mapping specification, enabling support for data processing from a wide array of streaming pipelines. The mapping configuration is defined via JSON and directs the data ingestion service on how to covert a Kafka record payload into database table rows.

💡

Data ingestion currently supports Kafka record payloads that are either JSON or Snappy compressed JSON.

Ingestion Control

MappingConfig

The JSON object root (MappingConfig) defines process-level configuration and has the following properties:

  • tables [array, required] - contains the list of table configuration (TableConfig) objects
  • compressed [boolean, optional, default=false] - defines whether or not the input data is Snappy compressed

TableConfig

The table configuration object (TableConfig) defines table-level configuration and has following properties:

  • originPath [string, required] - The JSON Path to the payload section providing data to ingest
  • targetTable [string, required] - The target table unique identifier (i.e., SCHEMA_NAME.TABLE_NAME)
  • operation [string, optional, default="INSERT"] - Defines the SQL operation to perform with the payload. Valid values include: { "INSERT", "MERGE" }
  • isArray [boolean, optional, default=false] - Defines whether or not the payload section is a JSON array or JSON object. If true, the data ingestion service will perform the specified operation on every entry in the array
  • required [boolean, optional, default=false] - Defines whether or not the payload section is required for ingestion (i.e., if true - ingestion will fail if the specified section does not exist in the record payload)
  • columnMapping [array, required] - Defines the list of column configuration objects (ColumnConfig)
  • nestedTables [array, optional, default=null] - Defines a list of nested table configuration objects (TableConfig). Note that a nested table originPath must be relative and cannot be deeper than one level.

ColumnConfig

The column configuration object (ColumnConfig) defines column-level configuration and has the following properties:

  • originKey [string, required] - Defines the JSON property key in the current payload section where the column data originates
  • destColumn [string, optional, default=<originKey>] - Defines the destination column name where the column data should go. If not specified, the data ingestion service will assume that originKey and destColumn are equal (case-insensitive).
  • dataType [string, required] - Defines the expected value type for the column data.

Example

Below you'll find an example JSON payload and mapping configuration to better demonstrate how the data ingestion service can be controlled. Note that this is a real-world example—it's a snippet of our blockchain indexing data and configuration!

Data Payload (JSON):

{
    "Block": {
        "number": 16086234,
        "hash": "0x9ded22e1c93361edceb5ffa1ca691d12d311cdd4c1a973f4896545a1f32ddcb2",
        "timestamp": "2022-12-01 00:00:11",
        "miner": "0x3b7fAEc3181114A99c243608BC822c5436441FfF",
        "reward": "165026803532390038",
        "size": 72792,
        "gasUsed": 24613176,
        "gasLimit": 30000000,
        "baseFeePerGas": "10140652006",
        "parentHash": "0xdceb222ea939cb659f72d06dd2c4d6e318a77a066395c3ddebdbd0f7209fee06",
        "transactionCount": 170
    },
    "Transaction": [
        {
            "hash": "0x11382b7a70361f477cb8d8ad4a12fb2c3d86cd3fd35b14481ed6f8142470203f",
            "blockNumber": 16086234,
            "timestamp": "2022-12-01 00:00:11",
            "from": "0x2cF07A5670Ca8d2c6404017c8fa98C8f13E17832",
            "to": "0x9e1075aE354D61e6358848b89d5386E5E9B3a16C",
            "value": "0",
            "gas": "16620000",
            "transactionFee": "93769241399309124",
            "cumulativeGasUsed": 8416854,
            "status": 1
        },
        {
            "hash": "0x06c1fa0de637a27b350f3d32fb82c6bb9f5a3ae2d0e2a89d8fc04707d359d12a",
            "blockNumber": 16086234,
            "timestamp": "2022-12-01 00:00:11",
            "from": "0x52Cc75bbCEf59f5E03839A37AEd1AeA9CBC06401",
            "to": "0x8bc646853Ccd72d26F3d51A0677fB732DC971Edb",
            "value": "0",
            "gas": "39000",
            "transactionFee": "29674134253416360",
            "cumulativeGasUsed": 8442914,
            "status": 1
        }
    ]
}

Configuration Payload (JSON):

{
    "tables": [
        {
            "originPath": "Block",
            "targetTable": "ETH.BLOCK",
            "columnMapping": [
                {
                    "originKey": "number",
                    "destColumn": "block_number",
                    "dataType": "bigint"
                },
                {
                    "originKey": "hash",
                    "destColumn": "block_hash",
                    "dataType": "String"
                },
                {
                    "originKey": "timestamp",
                    "dataType": "String"
                },
                {
                    "originKey": "miner",
                    "dataType": "String"
                },
                {
                    "originKey": "parentHash",
                    "destColumn": "parent_hash",
                    "dataType": "String"
                },
                {
                    "originKey": "reward",
                    "dataType": "String"
                },
                {
                    "originKey": "size",
                    "dataType": "int"
                },
                {
                    "originKey": "gasUsed",
                    "destColumn": "gas_used",
                    "dataType": "int"
                },
                {
                    "originKey": "gasLimit",
                    "destColumn": "gas_limit",
                    "dataType": "int"
                },
                {
                    "originKey": "baseFeePerGas",
                    "destColumn": "base_fee_per_gas",
                    "dataType": "String"
                },
                {
                    "originKey": "transactionCount",
                    "destColumn": "transaction_count",
                    "dataType": "int"
                }
            ]
        },
        {
            "originPath": "Transaction",
            "targetTable": "ETH.TRANSACTION",
            "isArray": true,
            "columnMapping": [
                {
                    "originKey": "hash",
                    "destColumn": "transaction_hash",
                    "dataType": "String"
                },
                {
                    "originKey": "blockNumber",
                    "destColumn": "block_number",
                    "dataType": "bigint"
                },
                {
                    "originKey": "from",
                    "destColumn": "from_address",
                    "dataType": "String"
                },
                {
                    "originKey": "to",
                    "destColumn": "to_address",
                    "dataType": "String"
                },
                {
                    "originKey": "value",
                    "dataType": "String"
                },
                {
                    "originKey": "gas",
                    "dataType": "String"
                },
                {
                    "originKey": "transactionFee",
                    "destColumn": "transaction_fee",
                    "dataType": "String"
                },
                {
                    "originKey": "cumulativeGasUsed",
                    "destColumn": "receipt_cumulative_gas_used",
                    "dataType": "int"
                },
                {
                    "originKey": "timestamp",
                    "dataType": "String"
                },
                {
                    "originKey": "status",
                    "destColumn": "receipt_status",
                    "dataType": "int"
                }
            ]
        }
    ]
}

The result of feeding the example data payload into the data ingestion service with the configuration payload will produce one insert into the ETH.BLOCK table and two inserts into the ETH.TRANSACTION table. For example, the ETH.BLOCK insert would look like this:

INSERT INTO ETH.BLOCK(BLOCK_NUMBER, BLOCK_HASH, TIMESTAMP, MINER, PARENT_HASH, REWARD, SIZE, GAS_USED, GAS_LIMIT, BASE_FEE_PER_GAS, TRANSACTION_COUNT) VALUES(16086234, '0x9ded22e1c93361edceb5ffa1ca691d12d311cdd4c1a973f4896545a1f32ddcb2', '2022-12-01 00:00:11', '0x3b7fAEc3181114A99c243608BC822c5436441FfF', '0xdceb222ea939cb659f72d06dd2c4d6e318a77a066395c3ddebdbd0f7209fee06', '165026803532390038', 72792, 24613176, 30000000, '10140652006', 170)