Skip to content

Kafka Writer

Kafka Writer plugin implements the functionality of writing data to Kafka in JSON format.

Example

The following configuration demonstrates how to read data from memory and write to specified topic in Kafka.

Create Task File

First create a task file stream2kafka.json with the following content:

json
{
  "job": {
    "setting": {
        "speed": {
            "channel": 1
        }
    },
    "content": [
      {
        "reader": {
            "name": "streamreader",
            "parameter": {
              "column": [
                    {"random": "10,1000", "type": "long"},
                    {"value": "1.1.1.1", "type": "string"},
                    {"value": 19890604.0, "type": "double"},
                    {"value": 19890604, "type": "long"},
                    {"value": 19890604, "type": "long"},
                    {"value": "hello world", "type": "string"},
                    {"value": "long text", "type": "string"},
                    {"value": "41.12,-71.34", "type": "string"},
                    {"value": "2017-05-25 11:22:33", "type": "string"}
                    ],
            "sliceRecordCount": 100
            }
        },
        "writer": {
          "name": "kafkawriter",
          "parameter": {
            "brokerList": "localhost:9092",
            "topic": "test-1",
            "partitions": 0,
            "batchSize": 1000,
            "column": ["col1", "col2","col3","col4","col5", "col6", "col7", "col8", "col9"]
          }
        }
      }
    ]
  }
}

Run

Execute bin/addax.sh stream2kafka.json command to get output similar to the following:

txt
2022-02-26 21:59:22.975 [        main] INFO  VMInfo               - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2022-02-26 21:59:22.985 [        main] INFO  Engine               - 
{
	"content":{
		"reader":{
			"parameter":{
				"column":[
					{
						"random":"10,1000",
						"type":"long"
					},
					{
						"type":"string",
						"value":"1.1.1.1"
					},
					{
						"type":"double",
						"value":19890604.0
					},
					{
						"type":"long",
						"value":19890604
					},
					{
						"type":"long",
						"value":19890604
					},
					{
						"type":"string",
						"value":"hello world"
					},
					{
						"type":"string",
						"value":"long text"
					},
					{
						"type":"string",
						"value":"41.12,-71.34"
					},
					{
						"type":"string",
						"value":"2017-05-25 11:22:33"
					}
				],
				"sliceRecordCount":100
			},
			"name":"streamreader"
		},
		"writer":{
			"parameter":{
				"partitions":0,
				"column":[
					"col1",
					"col2",
					"col3",
					"col4",
					"col5",
					"col6",
					"col7",
					"col8",
					"col9"
				],
				"topic":"test-1",
				"batchSize":1000,
				"brokerList":"localhost:9092"
			},
			"name":"kafkawriter"
		}
	},
	"setting":{
		"speed":{
			"channel":1
		}
	}
}

2022-02-26 21:59:23.002 [        main] INFO  PerfTrace            - PerfTrace traceId=job_-1, isEnable=false, priority=0
2022-02-26 21:59:23.003 [        main] INFO  JobContainer         - Addax jobContainer starts job.
2022-02-26 21:59:23.004 [        main] INFO  JobContainer         - Set jobId = 0
2022-02-26 21:59:23.017 [       job-0] INFO  JobContainer         - Addax Reader.Job [streamreader] do prepare work .
2022-02-26 21:59:23.017 [       job-0] INFO  JobContainer         - Addax Writer.Job [kafkawriter] do prepare work .
2022-02-26 21:59:23.017 [       job-0] INFO  JobContainer         - Job set Channel-Number to 1 channel(s).
2022-02-26 21:59:23.018 [       job-0] INFO  JobContainer         - Addax Reader.Job [streamreader] splits to [1] tasks.
2022-02-26 21:59:23.019 [       job-0] INFO  JobContainer         - Addax Writer.Job [kafkawriter] splits to [1] tasks.
2022-02-26 21:59:23.039 [       job-0] INFO  JobContainer         - Scheduler starts [1] taskGroups.
2022-02-26 21:59:23.047 [ taskGroup-0] INFO  TaskGroupContainer   - taskGroupId=[0] start [1] channels for [1] tasks.
2022-02-26 21:59:23.050 [ taskGroup-0] INFO  Channel              - Channel set byte_speed_limit to -1, No bps activated.
2022-02-26 21:59:23.050 [ taskGroup-0] INFO  Channel              - Channel set record_speed_limit to -1, No tps activated.
2022-02-26 21:59:23.082 [0-0-0-writer] INFO  ProducerConfig       - ProducerConfig values: 
	acks = 1
	batch.size = 1000
	bootstrap.servers = [localhost:9092]
	buffer.memory = 33554432
	client.id = addax-kafka-writer
	compression.type = none
	connections.max.idle.ms = 540000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 0
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2022-02-26 21:59:23.412 [0-0-0-writer] INFO  AppInfoParser        - Kafka version : 2.0.0
2022-02-26 21:59:23.413 [0-0-0-writer] INFO  AppInfoParser        - Kafka commitId : 3402a8361b734732
2022-02-26 21:59:23.534 [kafka-producer-network-thread | addax-kafka-writer] INFO  Metadata             - Cluster ID: xPAQZFNDTp6y63nZO4LACA
2022-02-26 21:59:26.061 [       job-0] INFO  AbstractScheduler    - Scheduler accomplished all tasks.
2022-02-26 21:59:26.062 [       job-0] INFO  JobContainer         - Addax Writer.Job [kafkawriter] do post work.
2022-02-26 21:59:26.062 [       job-0] INFO  JobContainer         - Addax Reader.Job [streamreader] do post work.
2022-02-26 21:59:26.063 [       job-0] INFO  JobContainer         - PerfTrace not enable!
2022-02-26 21:59:26.064 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 100 records, 9200 bytes | Speed 2.99KB/s, 33 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-02-26 21:59:26.065 [       job-0] INFO  JobContainer         - 
任务启动时刻                    : 2022-02-26 21:59:23
任务结束时刻                    : 2022-02-26 21:59:26
任务总计耗时                    :                  3s
任务平均流量                    :            2.99KB/s
记录写入速度                    :             33rec/s
读出记录总数                    :                 100
读写失败总数                    :                   0

We use Kafka's built-in kafka-console-consumer.sh to try reading data, output as follows:

bash
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-1 --from-beginning

{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":916}