About Streaming
Space and Time Streaming service have been temporarily suspended, as we complete network changes in support of our coming Mainnet. Check back soon for the new and improved streaming services!
Background
Streaming refers to a continuous, ordered transmission of data. Space and Time provides an easy way for users to set up their own streams and a flexible means to configure how the data should be ingested.
The Space and Time Streaming solution is composed of three primary components:
- Apache Kafka
- Streaming API
- Data Ingestion
Apache Kafka
The core engine of Space and Time streaming is Apache Kafka, an open-source solution well known and widely used for its high throughput, scalability, and client libraries in practically every programming language. For more information about Apache Kafka, see the official website.
Space and Time pairs Kafka's out-of-the-box authentication and authorization solutions with its own to enable secure configuration of the underlying resources within Kafka while maintaining compatibility with existing client libraries. For example, at configuration time, you can submit an API request to create a new topic and must provide an access token from prior authentication as well as a biscuit authorizing the instantiation of new infrastructure. At runtime, however, you can authenticate with Kafka and produce new data using existing Kafka libraries, without needing to implement additional logic.
Configure your Kafka client with the following properties:
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
Streaming API
Space and Time provides a suite of APIs to facilitate streaming operations. These APIs are primarily intended to enable configuration and management of Kafka infrastructure, but a simple REST API endpoint for producing data is also provided. See the API specs here.
Data Ingestion
Space and Time consumes data from user topics and applies it to the target tables. The data ingestion service is controlled by a flexible data mapping specification, enabling support for data processing from a wide array of streaming pipelines. The mapping configuration is defined via JSON and directs the data ingestion service on how to covert a Kafka record payload into database table rows.
Data ingestion currently supports Kafka record payloads that are either JSON or Snappy compressed JSON.
Ingestion Control
MappingConfig
The JSON object root (MappingConfig
) defines process-level configuration and has the following properties:
tables
[array, required] - contains the list of table configuration (TableConfig) objectscompressed
[boolean, optional, default=false
] - defines whether or not the input data is Snappy compressed
TableConfig
The table configuration object (TableConfig
) defines table-level configuration and has following properties:
originPath
[string, required] - The JSON Path to the payload section providing data to ingesttargetTable
[string, required] - The target table unique identifier (i.e.,SCHEMA_NAME.TABLE_NAME
)operation
[string, optional, default="INSERT"
] - Defines the SQL operation to perform with the payload. Valid values include:{ "INSERT", "MERGE" }
isArray
[boolean, optional, default=false
] - Defines whether or not the payload section is a JSON array or JSON object. Iftrue
, the data ingestion service will perform the specifiedoperation
on every entry in the arrayrequired
[boolean, optional, default=false
] - Defines whether or not the payload section is required for ingestion (i.e., iftrue
- ingestion will fail if the specified section does not exist in the record payload)columnMapping
[array, required] - Defines the list of column configuration objects (ColumnConfig)nestedTables
[array, optional, default=null
] - Defines a list of nested table configuration objects (TableConfig
). Note that a nested tableoriginPath
must be relative and cannot be deeper than one level.
ColumnConfig
The column configuration object (ColumnConfig
) defines column-level configuration and has the following properties:
originKey
[string, required] - Defines the JSON property key in the current payload section where the column data originatesdestColumn
[string, optional, default=<originKey>
] - Defines the destination column name where the column data should go. If not specified, the data ingestion service will assume thatoriginKey
anddestColumn
are equal (case-insensitive).dataType
[string, required] - Defines the expected value type for the column data.
Example
Below you'll find an example JSON payload and mapping configuration to better demonstrate how the data ingestion service can be controlled. Note that this is a real-world example—it's a snippet of our blockchain indexing data and configuration!
Data Payload (JSON):
{
"Block": {
"number": 16086234,
"hash": "0x9ded22e1c93361edceb5ffa1ca691d12d311cdd4c1a973f4896545a1f32ddcb2",
"timestamp": "2022-12-01 00:00:11",
"miner": "0x3b7fAEc3181114A99c243608BC822c5436441FfF",
"reward": "165026803532390038",
"size": 72792,
"gasUsed": 24613176,
"gasLimit": 30000000,
"baseFeePerGas": "10140652006",
"parentHash": "0xdceb222ea939cb659f72d06dd2c4d6e318a77a066395c3ddebdbd0f7209fee06",
"transactionCount": 170
},
"Transaction": [
{
"hash": "0x11382b7a70361f477cb8d8ad4a12fb2c3d86cd3fd35b14481ed6f8142470203f",
"blockNumber": 16086234,
"timestamp": "2022-12-01 00:00:11",
"from": "0x2cF07A5670Ca8d2c6404017c8fa98C8f13E17832",
"to": "0x9e1075aE354D61e6358848b89d5386E5E9B3a16C",
"value": "0",
"gas": "16620000",
"transactionFee": "93769241399309124",
"cumulativeGasUsed": 8416854,
"status": 1
},
{
"hash": "0x06c1fa0de637a27b350f3d32fb82c6bb9f5a3ae2d0e2a89d8fc04707d359d12a",
"blockNumber": 16086234,
"timestamp": "2022-12-01 00:00:11",
"from": "0x52Cc75bbCEf59f5E03839A37AEd1AeA9CBC06401",
"to": "0x8bc646853Ccd72d26F3d51A0677fB732DC971Edb",
"value": "0",
"gas": "39000",
"transactionFee": "29674134253416360",
"cumulativeGasUsed": 8442914,
"status": 1
}
]
}
Configuration Payload (JSON):
{
"tables": [
{
"originPath": "Block",
"targetTable": "ETH.BLOCK",
"columnMapping": [
{
"originKey": "number",
"destColumn": "block_number",
"dataType": "bigint"
},
{
"originKey": "hash",
"destColumn": "block_hash",
"dataType": "String"
},
{
"originKey": "timestamp",
"dataType": "String"
},
{
"originKey": "miner",
"dataType": "String"
},
{
"originKey": "parentHash",
"destColumn": "parent_hash",
"dataType": "String"
},
{
"originKey": "reward",
"dataType": "String"
},
{
"originKey": "size",
"dataType": "int"
},
{
"originKey": "gasUsed",
"destColumn": "gas_used",
"dataType": "int"
},
{
"originKey": "gasLimit",
"destColumn": "gas_limit",
"dataType": "int"
},
{
"originKey": "baseFeePerGas",
"destColumn": "base_fee_per_gas",
"dataType": "String"
},
{
"originKey": "transactionCount",
"destColumn": "transaction_count",
"dataType": "int"
}
]
},
{
"originPath": "Transaction",
"targetTable": "ETH.TRANSACTION",
"isArray": true,
"columnMapping": [
{
"originKey": "hash",
"destColumn": "transaction_hash",
"dataType": "String"
},
{
"originKey": "blockNumber",
"destColumn": "block_number",
"dataType": "bigint"
},
{
"originKey": "from",
"destColumn": "from_address",
"dataType": "String"
},
{
"originKey": "to",
"destColumn": "to_address",
"dataType": "String"
},
{
"originKey": "value",
"dataType": "String"
},
{
"originKey": "gas",
"dataType": "String"
},
{
"originKey": "transactionFee",
"destColumn": "transaction_fee",
"dataType": "String"
},
{
"originKey": "cumulativeGasUsed",
"destColumn": "receipt_cumulative_gas_used",
"dataType": "int"
},
{
"originKey": "timestamp",
"dataType": "String"
},
{
"originKey": "status",
"destColumn": "receipt_status",
"dataType": "int"
}
]
}
]
}
The result of feeding the example data payload into the data ingestion service with the configuration payload will produce one insert into the ETH.BLOCK
table and two inserts into the ETH.TRANSACTION
table. For example, the ETH.BLOCK
insert would look like this:
INSERT INTO ETH.BLOCK(BLOCK_NUMBER, BLOCK_HASH, TIMESTAMP, MINER, PARENT_HASH, REWARD, SIZE, GAS_USED, GAS_LIMIT, BASE_FEE_PER_GAS, TRANSACTION_COUNT) VALUES(16086234, '0x9ded22e1c93361edceb5ffa1ca691d12d311cdd4c1a973f4896545a1f32ddcb2', '2022-12-01 00:00:11', '0x3b7fAEc3181114A99c243608BC822c5436441FfF', '0xdceb222ea939cb659f72d06dd2c4d6e318a77a066395c3ddebdbd0f7209fee06', '165026803532390038', 72792, 24613176, 30000000, '10140652006', 170)
Space and Time Streaming service have been temporarily suspended, as we complete network changes in support of our coming Mainnet. Check back soon for the new and improved streaming services!
Updated 6 months ago