Skip to content

Kudu Writer

Kudu Writer 插件实现了将数据写入到 kudu 的能力,当前是通过调用原生RPC接口来实现的。 后期希望通过 impala 接口实现,从而增加更多的功能。

示例

以下示例演示了如何从内存读取样例数据并写入到 kudu 表中的。

表结构

我们用 trino 工具连接到 kudu 服务,然后通过下面的 SQL 语句创建表

CREATE TABLE kudu.default.users ( user_id int WITH (primary_key = true), user_name varchar, salary double ) WITH ( partition_by_hash_columns = ARRAY['user_id'], partition_by_hash_buckets = 2 );

job 配置文件

创建 job/stream2kudu.json 文件,内容如下:

json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": {
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "random": "1,1000",
              "type": "long"
            },
            {
              "random": "1,10",
              "type": "string"
            },
            {
              "random": "1000,50000",
              "type": "double"
            }
          ],
          "sliceRecordCount": 1000
        }
      },
      "writer": {
        "name": "kuduwriter",
        "parameter": {
          "masterAddress": "127.0.0.1:7051,127.0.0.1:7151,127.0.0.1:7251",
          "timeout": 60,
          "table": "users",
          "writeMode": "upsert",
          "column": [ "user_id", "user_name", "salary"],
          "batchSize": 1024,
          "bufferSize": 2048,
          "skipFail": false,
          "encoding": "UTF-8"
        }
      }
    }
  }
}

运行

执行下下面的命令进行数据采集

bash
bin/addax.sh job/stream2kudu.json

参数说明

配置项是否必须类型默认值描述
masterAddressstringKudu Master集群RPC地址,多个地址用逗号(,)分隔
tablestringkudu 表名
writeModestringupsert表数据写入模式,支持 upsert, insert 两者
timeoutint100写入数据超时时间(秒), 0 表示不受限制
columnlist要写入的表字段,配置方式见上示例
skipFailbooleanfalse是否跳过插入失败的记录,如果设置为true,则插件不会把插入失败的当作异常
haveKerberosbooleanfalse是否启用 Kerberos 认证,如果启用,则需要同时配置以下两项
kerberosKeytabFilePathstring用于 Kerberos 认证的凭证文件路径, 比如 /your/path/addax.service.keytab
kerberosPrincipalstring用于 Kerberos 认证的凭证主体, 比如 addax/node1@WGZHAO.COM

column

column 可以直接指定要写入的列,如同上述例子,也可以设置 ["*"] 来表示写入所有列。