Set up Kafka Streaming
Space and Time Streaming service have been temporarily suspended, as we complete network changes in support of our coming Mainnet. Check back soon for the new and improved streaming services!
In this guide, we'll walk through the steps for setting up Kafka streaming into Space and Time. We will follow the following steps:
- Create an infrastructure group
- Create a user
- Create a topic
- Create mapping configuration
- Get cluster metadata
Setup
First, we need to create a biscuit for the Kafka infrastructure group. We can do this easily using the sxtcli
like so:
sxtcli biscuit generate kafka --operations="CREATE,READ,UPDATE,DELETE" --privateKey=$BISCUIT_PRIVATE_KEY --resources="INFRASTRUCTURE_GROUP,USER,TOPIC,ACL,MAPPING_CONFIG"
In the example above, the $BISCUIT_PRIVATE_KEY
should be from your biscuit keypair, not your user private key.
For this guide, I've loaded the biscuit returned here into an environment variable named KB
so we can more easily cURL with it.
1. Create an Infrastructure group
Using the create infra groupAPI endpoint, we create an infrastructure group. Please replace zak_stream_1
with your desired infra group name.
curl --request POST \
--url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1 \
--header 'accept: application/json' \
--header 'authorization: Bearer '"$AT"'' \
--header 'biscuit: '"$KB"'' \
--header 'content-type: application/json' \
--data '
{
"publicKey": "<this-needs-to-be-your-biscuit-public-key-tied-to-step-one>"
}
'
Next, we can confirm the group was created with the Get infrastructure group endpoint like this:
curl --request GET \
--url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1 \
--header 'accept: application/json' \
--header 'authorization: Bearer '"$AT"'' \
--header 'biscuit: '"$KB"''
This will return the biscuit public key associated with the infra group:
{"publicKey":"8613098....."}
2. Create a Kafka User
Next, we will use the Create Kafka user endpoint. Take note of the username in the URL. In this case, you will want to replace axiom
with your username and zak_stream_1
with your infra group.
curl --request POST \
--url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/user/axiom \
--header 'accept: application/json' \
--header 'authorization: Bearer '"$AT"'' \
--header 'biscuit: '"$KB"'' \
--header 'content-type: application/json' \
--data '
{
"password": "<use-a-secure-password>"
}
'
Next, we will use the Get Kakfa users endpoint to confirm it worked. Please replace zak_stream_1
in the URL with your infra group name from step one.
curl --request GET \
--url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/users \
--header 'accept: application/json' \
--header 'authorization: Bearer '"$AT"'' \
--header 'biscuit: '"$KB"''
And we can see it worked:
["axiom"]
3. Create a Topic
Next, to create a topic, replace alajuela
in the URL with your topic name using the Create Kafka topic:
curl --request POST \
--url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/topic/alajuela \
--header 'accept: application/json' \
--header 'authorization: Bearer '"$AT"'' \
--header 'biscuit: '"$KB"'' \
--header 'content-type: application/json' \
--data '{"partitionCount":1}'
Then we can confirm it worked with Get Kafka topic
curl --request GET \
--url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/topic/alajuela \
--header 'accept: application/json' \
--header 'authorization: Bearer '"$AT"'' \
--header 'biscuit: '"$KB"''
As we can see, it returns the following data:
{"partitionCount":1,"replicationFactor":2}
4. Enable Topic Write Access
Using the Configure Kafka ACLs Endpoint ALLOW write access to your topic:
curl --request POST \
--url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/topic/alajuela/acls \
--header 'accept: application/json' \
--header 'authorization: Bearer '"$AT"'' \
--header 'biscuit: '"$KB"' \
--header 'content-type: application/json' \
--data '
{
"principal": "axiom",
"permissionType": "ALLOW"
}
'
Make sure you update the URL with your group and topic name. The principal
should be the user you set up above.
An empty reply means the API accepted your ACL update.
5. Get Cluster Metadata
As a helper, SxT provides an Get cluster metadata endpoint that will return the metadata needed to set up your Kafka stream:
curl --request GET \
--url https://<SxT-API-URL>/v1/streaming/cluster-details \
--header 'accept: application/json' \
--header 'authorization: Bearer '"$AT"''
6. Create Mapping Config
If you haven't already created a table that you will be streaming to, you can create it easily with the SxTCLI like this:
sxtcli sql --accessToken=$AT --url=$API_URL ddl --resourceId="SE_PLAYGROUND.stream_1" --biscuits=$KB --sqlText="CREATE TABLE SE_PLAYGROUND.stream_1 (user_id bigint, user_address VARCHAR PRIMARY KEY) WITH \"public_key=<your-biscuit-public-key>,access_type=public_read\""
Then, we create the actual mapping with the Create mapping configuration endpoint:
curl -i --request POST \
--url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/topic/alajuela/mapping/zak_map_1 \
--header 'accept: application/json' \
--header 'authorization: Bearer '"$AT"'' \
--header 'biscuit: '"$KB"'' \
--header 'content-type: application/json' \
--data '
{
"mappingConfig": {
"compressed": false,
"tables": [
{
"operation": "INSERT",
"isArray": true,
"required": false,
"originPath": "records.data",
"targetTable": "se_playground.stream_1",
"columnMapping": [
{
"originKey": "user_id",
"destColumn": "user_id",
"dataType": "bigint"
},
{
"originKey": "user_address",
"destColumn": "user_address",
"dataType": "varchar"
}
]
}
]
},
"biscuits": [
"<your-kafka-biscuit-here>"
]
}
'
Then we confirm it worked with the following:
curl --request GET \
--url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/topic/alajuela/mapping/zak_map_1 \
--header 'accept: application/json' \
--header 'authorization: Bearer '"$AT"'' \
--header 'biscuit: '"$KB"''
And it will return something like:
{
"tables": [
{
"originPath": "records.data",
"targetTable": "se_playground.stream_1",
"isArray": true,
"columnMapping": [
{
"originKey": "user_id",
"destColumn": "user_id",
"dataType": "bigint"
},
{
"originKey": "user_address",
"destColumn": "user_address",
"dataType": "varchar"
}
]
}
]
}
Now, you should be ready to start streaming into Space and Time!
Space and Time Streaming service have been temporarily suspended, as we complete network changes in support of our coming Mainnet. Check back soon for the new and improved streaming services!
Updated 5 months ago