Set up Kafka Streaming

In this guide, we'll walk through the steps for setting up Kafka streaming into Space and Time. We will follow the following steps:

  1. Create an infrastructure group
  2. Create a user
  3. Create a topic
  4. Create mapping configuration
  5. Get cluster metadata

Setup

First, we need to create a biscuit for the Kafka infrastructure group. We can do this easily using the sxtcli like so:

sxtcli biscuit generate kafka --operations="CREATE,READ,UPDATE,DELETE" --privateKey=$BISCUIT_PRIVATE_KEY --resources="INFRASTRUCTURE_GROUP,USER,TOPIC,ACL,MAPPING_CONFIG"

In the example above, the $BISCUIT_PRIVATE_KEY should be from your biscuit keypair, not your user private key.

For this guide, I've loaded the biscuit returned here into an environment variable named KB so we can more easily cURL with it.

1. Create an Infrastructure group

Using the create infra groupAPI endpoint, we create an infrastructure group. Please replace zak_stream_1 with your desired infra group name.

 curl --request POST \
     --url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1 \
     --header 'accept: application/json' \
     --header 'authorization: Bearer '"$AT"'' \
     --header 'biscuit: '"$KB"'' \
     --header 'content-type: application/json' \
     --data '
{
  "publicKey": "<this-needs-to-be-your-biscuit-public-key-tied-to-step-one>"
}
'

Next, we can confirm the group was created with the Get infrastructure group endpoint like this:

curl --request GET \
    --url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1 \
    --header 'accept: application/json' \
    --header 'authorization: Bearer '"$AT"'' \
    --header 'biscuit: '"$KB"''

This will return the biscuit public key associated with the infra group:

{"publicKey":"8613098....."}

2. Create a Kafka User

Next, we will use the Create Kafka user endpoint. Take note of the username in the URL. In this case, you will want to replace axiom with your username and zak_stream_1 with your infra group.

curl --request POST \
     --url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/user/axiom \
     --header 'accept: application/json' \
     --header 'authorization: Bearer '"$AT"'' \
     --header 'biscuit: '"$KB"'' \
     --header 'content-type: application/json' \
     --data '
{
  "password": "<use-a-secure-password>"
}
'

Next, we will use the Get Kakfa users endpoint to confirm it worked. Please replace zak_stream_1 in the URL with your infra group name from step one.

curl --request GET \
     --url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/users \
     --header 'accept: application/json' \
     --header 'authorization: Bearer '"$AT"'' \
     --header 'biscuit: '"$KB"''

And we can see it worked:

["axiom"]

3. Create a Topic

Next, to create a topic, replace alajuela in the URL with your topic name using the Create Kafka topic:

curl --request POST \
     --url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/topic/alajuela \
     --header 'accept: application/json' \
     --header 'authorization: Bearer '"$AT"'' \
     --header 'biscuit: '"$KB"'' \
     --header 'content-type: application/json' \
     --data '{"partitionCount":1}'

Then we can confirm it worked with Get Kafka topic

curl --request GET \
     --url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/topic/alajuela \
     --header 'accept: application/json' \
     --header 'authorization: Bearer '"$AT"'' \
     --header 'biscuit: '"$KB"''

As we can see, it returns the following data:

{"partitionCount":1,"replicationFactor":2}

4. Enable Topic Write Access

Using the Configure Kafka ACLs Endpoint ALLOW write access to your topic:

curl --request POST \
     --url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/topic/alajuela/acls \
     --header 'accept: application/json' \
     --header 'authorization: Bearer '"$AT"'' \
     --header 'biscuit: '"$KB"' \
     --header 'content-type: application/json' \
     --data '
{
  "principal": "axiom",
  "permissionType": "ALLOW"
}
'

Make sure you update the URL with your group and topic name. The principal should be the user you set up above.

An empty reply means the API accepted your ACL update.

5. Get Cluster Metadata

As a helper, SxT provides an Get cluster metadata endpoint that will return the metadata needed to set up your Kafka stream:

curl --request GET \
     --url https://<SxT-API-URL>/v1/streaming/cluster-details \
     --header 'accept: application/json' \
     --header 'authorization: Bearer '"$AT"''

6. Create Mapping Config

If you haven't already created a table that you will be streaming to, you can create it easily with the SxTCLI like this:

sxtcli sql --accessToken=$AT --url=$API_URL ddl --resourceId="SE_PLAYGROUND.stream_1" --biscuits=$KB --sqlText="CREATE TABLE SE_PLAYGROUND.stream_1 (user_id bigint, user_address VARCHAR PRIMARY KEY) WITH \"public_key=<your-biscuit-public-key>,access_type=public_read\"" 

Then, we create the actual mapping with the Create mapping configuration endpoint:

curl -i --request POST \
     --url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/topic/alajuela/mapping/zak_map_1 \
     --header 'accept: application/json' \
     --header 'authorization: Bearer '"$AT"'' \
     --header 'biscuit: '"$KB"'' \
     --header 'content-type: application/json' \
     --data '
{
  "mappingConfig": {
    "compressed": false,
    "tables": [
      {
        "operation": "INSERT",
        "isArray": true,
        "required": false,
        "originPath": "records.data",
        "targetTable": "se_playground.stream_1",
        "columnMapping": [
          {
            "originKey": "user_id",
            "destColumn": "user_id",
            "dataType": "bigint"
          },
          {
            "originKey": "user_address",
            "destColumn": "user_address",
            "dataType": "varchar"
          }
        ]
      }
    ]
  },
  "biscuits": [
    "<your-kafka-biscuit-here>"
  ]
}
'

Then we confirm it worked with the following:

curl --request GET \
     --url https://<SxT-API-URL>/v1/streaming/group/zak_stream_1/topic/alajuela/mapping/zak_map_1 \
     --header 'accept: application/json' \
     --header 'authorization: Bearer '"$AT"'' \
     --header 'biscuit: '"$KB"''

And it will return something like:

{
  "tables": [
    {
      "originPath": "records.data",
      "targetTable": "se_playground.stream_1",
      "isArray": true,
      "columnMapping": [
        {
          "originKey": "user_id",
          "destColumn": "user_id",
          "dataType": "bigint"
        },
        {
          "originKey": "user_address",
          "destColumn": "user_address",
          "dataType": "varchar"
        }
      ]
    }
  ]
}

Now, you should be ready to start streaming into Space and Time!