Kafka Writer
Kafka Writer plugin implements the functionality of writing data to Kafka in JSON format.
Example
The following configuration demonstrates how to read data from memory and write to specified topic in Kafka.
Create Task File
First create a task file stream2kafka.json with the following content:
json
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{"random": "10,1000", "type": "long"},
{"value": "1.1.1.1", "type": "string"},
{"value": 19890604.0, "type": "double"},
{"value": 19890604, "type": "long"},
{"value": 19890604, "type": "long"},
{"value": "hello world", "type": "string"},
{"value": "long text", "type": "string"},
{"value": "41.12,-71.34", "type": "string"},
{"value": "2017-05-25 11:22:33", "type": "string"}
],
"sliceRecordCount": 100
}
},
"writer": {
"name": "kafkawriter",
"parameter": {
"brokerList": "localhost:9092",
"topic": "test-1",
"partitions": 0,
"batchSize": 1000,
"column": ["col1", "col2","col3","col4","col5", "col6", "col7", "col8", "col9"]
}
}
}
]
}
}Run
Execute bin/addax.sh stream2kafka.json command to get output similar to the following:
txt
2022-02-26 21:59:22.975 [ main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2022-02-26 21:59:22.985 [ main] INFO Engine -
{
"content":{
"reader":{
"parameter":{
"column":[
{
"random":"10,1000",
"type":"long"
},
{
"type":"string",
"value":"1.1.1.1"
},
{
"type":"double",
"value":19890604.0
},
{
"type":"long",
"value":19890604
},
{
"type":"long",
"value":19890604
},
{
"type":"string",
"value":"hello world"
},
{
"type":"string",
"value":"long text"
},
{
"type":"string",
"value":"41.12,-71.34"
},
{
"type":"string",
"value":"2017-05-25 11:22:33"
}
],
"sliceRecordCount":100
},
"name":"streamreader"
},
"writer":{
"parameter":{
"partitions":0,
"column":[
"col1",
"col2",
"col3",
"col4",
"col5",
"col6",
"col7",
"col8",
"col9"
],
"topic":"test-1",
"batchSize":1000,
"brokerList":"localhost:9092"
},
"name":"kafkawriter"
}
},
"setting":{
"speed":{
"channel":1
}
}
}
2022-02-26 21:59:23.002 [ main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2022-02-26 21:59:23.003 [ main] INFO JobContainer - Addax jobContainer starts job.
2022-02-26 21:59:23.004 [ main] INFO JobContainer - Set jobId = 0
2022-02-26 21:59:23.017 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] do prepare work .
2022-02-26 21:59:23.017 [ job-0] INFO JobContainer - Addax Writer.Job [kafkawriter] do prepare work .
2022-02-26 21:59:23.017 [ job-0] INFO JobContainer - Job set Channel-Number to 1 channel(s).
2022-02-26 21:59:23.018 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] splits to [1] tasks.
2022-02-26 21:59:23.019 [ job-0] INFO JobContainer - Addax Writer.Job [kafkawriter] splits to [1] tasks.
2022-02-26 21:59:23.039 [ job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2022-02-26 21:59:23.047 [ taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2022-02-26 21:59:23.050 [ taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2022-02-26 21:59:23.050 [ taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2022-02-26 21:59:23.082 [0-0-0-writer] INFO ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 1000
bootstrap.servers = [localhost:9092]
buffer.memory = 33554432
client.id = addax-kafka-writer
compression.type = none
connections.max.idle.ms = 540000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2022-02-26 21:59:23.412 [0-0-0-writer] INFO AppInfoParser - Kafka version : 2.0.0
2022-02-26 21:59:23.413 [0-0-0-writer] INFO AppInfoParser - Kafka commitId : 3402a8361b734732
2022-02-26 21:59:23.534 [kafka-producer-network-thread | addax-kafka-writer] INFO Metadata - Cluster ID: xPAQZFNDTp6y63nZO4LACA
2022-02-26 21:59:26.061 [ job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2022-02-26 21:59:26.062 [ job-0] INFO JobContainer - Addax Writer.Job [kafkawriter] do post work.
2022-02-26 21:59:26.062 [ job-0] INFO JobContainer - Addax Reader.Job [streamreader] do post work.
2022-02-26 21:59:26.063 [ job-0] INFO JobContainer - PerfTrace not enable!
2022-02-26 21:59:26.064 [ job-0] INFO StandAloneJobContainerCommunicator - Total 100 records, 9200 bytes | Speed 2.99KB/s, 33 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2022-02-26 21:59:26.065 [ job-0] INFO JobContainer -
任务启动时刻 : 2022-02-26 21:59:23
任务结束时刻 : 2022-02-26 21:59:26
任务总计耗时 : 3s
任务平均流量 : 2.99KB/s
记录写入速度 : 33rec/s
读出记录总数 : 100
读写失败总数 : 0We use Kafka's built-in kafka-console-consumer.sh to try reading data, output as follows:
bash
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-1 --from-beginning
{"col8":"41.12,-71.34","col9":"2017-05-25 11:22:33","col6":"hello world","col7":"long text","col4":19890604,"col5":19890604,"col2":"1.1.1.1","col3":1.9890604E7,"col1":916}