Tap Kafka

Messages from kafka topics are extracted into the following fields:

  • MESSAGE_TIMESTAMP: Timestamp extracted from the kafka metadata

  • MESSAGE_OFFSET: Offset extracted from the kafka metadata

  • MESSAGE_PARTITION: Partition extracted from the kafka metadata

  • MESSAGE: The original and full kafka message

  • MESSAGE_KEY: (Optional) Key extracted from the Kafka message.

  • Dynamic primary key columns: (Optional) Fields extracted from the Kafka JSON messages by JSONPath selector(s).

Supported message formats: JSON and Protobuf (experimental).

Configuring what to replicate

PipelineWise configures every tap with a common structured YAML file format. A sample YAML for Kafka replication can be generated into a project directory by following the steps in the Generating Sample Pipelines section.

Example YAML for tap-kafka:

---

# ------------------------------------------------------------------------------
# General Properties
# ------------------------------------------------------------------------------
id: "kafka"                            # Unique identifier of the tap
name: "Kafka Topic with sample data"   # Name of the tap
type: "tap-kafka"                      # !! THIS SHOULD NOT CHANGE !!
owner: "somebody@foo.com"              # Data owner to contact
#send_alert: False                     # Optional: Disable all configured alerts on this tap
#slack_alert_channel: "#tap-channel"   # Optional: Sending a copy of specific tap alerts to this slack channel


# ------------------------------------------------------------------------------
# Source (Tap) - Kafka connection details
# ------------------------------------------------------------------------------
db_conn:
  group_id: "myGroupId"
  bootstrap_servers: "kafka1.foo.com:9092,kafka2.foo.com:9092,kafka3.foo.com:9092"
  topic: "myKafkaTopic"

  # --------------------------------------------------------------------------
  # Optionally you can define primary key(s) from the kafka JSON messages.
  # If primary keys defined then extra column(s) will be added to the output
  # singer stream with the extracted values by /slashed/paths ala XPath selectors.
  # --------------------------------------------------------------------------
  primary_keys:
     transfer_id: "/transferMetadata/transferId"

  # --------------------------------------------------------------------------
  # Additionally you can specify whether to use Kafka message key as a primary key.
  # If custom primary keys were specified earlier, message key property is ignored
  # and custom PKs are used instead.
  # ! Message key should be a valid utf-8 encoded string.
  # --------------------------------------------------------------------------
  use_message_key:                          # (Default: true)

  #initial_start_time:                      # (Default: latest) Start time reference of the message consumption if
                                            # no bookmarked position in state.json. One of: latest, earliest or an
                                            # ISO-8601 formatted timestamp string.

  # --------------------------------------------------------------------------
  # Kafka Consumer optional parameters. Commented values are default values.
  # --------------------------------------------------------------------------
  #max_runtime_ms: 300000                   # The maximum time for the tap to collect new messages from Kafka topic.
  #consumer_timeout_ms: 10000               # KafkaConsumer setting. Number of milliseconds to block during message iteration before raising StopIteration
  #session_timeout_ms: 30000                # KafkaConsumer setting. The timeout used to detect failures when using Kafka’s group management facilities.
  #heartbeat_interval_ms: 10000             # KafkaConsumer setting. The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
  #max_poll_interval_ms: 300000             # KafkaConsumer setting. The maximum delay between invocations of poll() when using consumer group management.

  #commit_interval_ms: 5000                 # Number of milliseconds between two commits. This is different than the kafka auto commit feature. Tap-kafka sends commit messages automatically but only when the data consumed successfully and persisted to local store.

  # --------------------------------------------------------------------------
  # Protobuf support - Experimental
  # --------------------------------------------------------------------------
  #message_format: protobuf                 # (Default: json) Supported message formats are json and protobuf.
  #proto_schema: |                          # Protobuf message format in .proto syntax. Required if the message_format is protobuf.
  #     syntax = "proto3";
  #
  #     message ProtoMessage {
  #       string query = 1;
  #       int32 page_number = 2;
  #       int32 result_per_page = 3;
  #     }
  #proto_classess_dir:                      # (Default: current working dir) Directory where to store runtime compiled proto classes
  #debug_contexts: ""                       # comma separated list of debug contexts to enable for the consumer [see librkafka](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts)

# ------------------------------------------------------------------------------
# Destination (Target) - Target properties
# Connection details should be in the relevant target YAML file
# ------------------------------------------------------------------------------
target: "snowflake"                       # ID of the target connector where the data will be loaded
batch_size_rows: 20000                    # Batch size for the stream to optimise load performance
stream_buffer_size: 0                     # In-memory buffer size (MB) between taps and targets for asynchronous data pipes
default_target_schema: "kafka"            # Target schema where the data will be loaded
default_target_schema_select_permission:  # Optional: Grant SELECT on schema and tables that created
  - grp_stats
#batch_wait_limit_seconds: 3600           # Optional: Maximum time to wait for `batch_size_rows`. Available only for snowflake target.

# Options only for Snowflake target
#archive_load_files: False                      # Optional: when enabled, the files loaded to Snowflake will also be stored in `archive_load_files_s3_bucket`
#archive_load_files_s3_prefix: "archive"        # Optional: When `archive_load_files` is enabled, the archived files will be placed in the archive S3 bucket under this prefix.
#archive_load_files_s3_bucket: "<BUCKET_NAME>"  # Optional: When `archive_load_files` is enabled, the archived files will be placed in this bucket. (Default: the value of `s3_bucket` in target snowflake YAML)


# ------------------------------------------------------------------------------
# Source to target Schema mapping
# ------------------------------------------------------------------------------
schemas:
  - source_schema: "kafka"             # This is mandatory, but can be anything in this tap type
    target_schema: "kafka"             # Target schema in the destination Data Warehouse

    # Kafka topic to replicate into destination Data Warehouse
    # You can load data only from one kafka topic in one YAML file.
    # If you want load from multiple kafka topics, create another tap YAML similar to this file
    tables:
      - table_name: "my_kafka_topic"   # target table name needs to match to the topic name in snake case format

        # OPTIONAL: Load time transformations
        #transformations:
        #  - column: "last_name"            # Column to transform
        #    type: "SET-NULL"               # Transformation type