Skip to content

ElasticSearch Writer

ElasticSearch Writer 插件用于向 ElasticSearch 写入数据。 其实现是通过 elasticsearch 的 rest api 接口, 批量把据写入 elasticsearch

配置样例

json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": {
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "random": "10,1000",
              "type": "long"
            },
            {
              "value": "1.1.1.1",
              "type": "string"
            },
            {
              "value": 19890604,
              "type": "double"
            },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": "hello world",
              "type": "string"
            },
            {
              "value": "long text",
              "type": "string"
            },
            {
              "value": "41.12,-71.34",
              "type": "string"
            },
            {
              "value": "2017-05-25 11:22:33",
              "type": "string"
            }
          ],
          "sliceRecordCount": 100
        }
      },
      "writer": {
        "name": "elasticsearchwriter",
        "parameter": {
          "endpoint": "http://localhost:9200",
          "index": "test-1",
          "type": "default",
          "cleanup": true,
          "settings": {
            "index": {
              "number_of_shards": 1,
              "number_of_replicas": 0
            }
          },
          "discovery": false,
          "batchSize": 1000,
          "splitter": ",",
          "column": [
            {
              "name": "pk",
              "type": "id"
            },
            {
              "name": "col_ip",
              "type": "ip"
            },
            {
              "name": "col_double",
              "type": "double"
            },
            {
              "name": "col_long",
              "type": "long"
            },
            {
              "name": "col_integer",
              "type": "integer"
            },
            {
              "name": "col_keyword",
              "type": "keyword"
            },
            {
              "name": "col_text",
              "type": "text",
              "analyzer": "ik_max_word"
            },
            {
              "name": "col_geo_point",
              "type": "geo_point"
            },
            {
              "name": "col_date",
              "type": "date",
              "format": "yyyy-MM-dd HH:mm:ss"
            },
            {
              "name": "col_nested1",
              "type": "nested"
            },
            {
              "name": "col_nested2",
              "type": "nested"
            },
            {
              "name": "col_object1",
              "type": "object"
            },
            {
              "name": "col_object2",
              "type": "object"
            },
            {
              "name": "col_integer_array",
              "type": "integer",
              "array": true
            },
            {
              "name": "col_geo_shape",
              "type": "geo_shape",
              "tree": "quadtree",
              "precision": "10m"
            }
          ]
        }
      }
    }
  }
}

参数说明

配置项是否必须数据类型默认值描述
endpointstringElasticSearch 的连接地址,如果是集群,则多个地址用逗号(,)分割
accessIdstringhttp auth 中的 user, 默认为空
accessKeystringhttp auth 中的 password
indexstringindex 名
typestringdefaultindex 类型
cleanupbooleanfalse是否删除原表
batchSizeint1000每次批量数据的条数
trySizeint30失败后重试的次数
timeoutint600000客户端超时时间,单位为毫秒(ms)
discoverybooleanfalse启用节点发现将(轮询)并定期更新客户机中的服务器列表
compressionbooleantrue否是开启 http 请求压缩
multiThreadbooleantrue是否开启多线程 http 请求
ignoreWriteErrorbooleanfalse写入错误时,是否重试,如果是 true 则表示一直重试,否则忽略该条数据
ignoreParseErrorbooleantrue解析数据格式错误时,是否继续写入
aliasstring数据导入完成后写入别名
aliasModestringappend数据导入完成后增加别名的模式,append(增加模式), exclusive(只留这一个)
settingsmap创建 index 时候的 settings, 与 elasticsearch 官方相同
splitterstring,如果插入数据是 array,就使用指定分隔符
columnlist<map>字段类型,文档中给出的样例中包含了全部支持的字段类型
dynamicbooleanfalse不使用 addax 的 mappings,使用 es 自己的自动 mappings

约束限制

  • 如果导入 id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性
  • 如果不导入 id,就是 append_only 模式,elasticsearch 自动生成 id,速度会提升 20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的)