Skip to content

ElasticSearch Writer

ElasticSearch Writer plugin is used to write data to ElasticSearch. It is implemented through elasticsearch's rest api interface, writing data to elasticsearch in batches.

Configuration Example

json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": {
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "random": "10,1000",
              "type": "long"
            },
            {
              "value": "1.1.1.1",
              "type": "string"
            },
            {
              "value": 19890604,
              "type": "double"
            },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": "hello world",
              "type": "string"
            },
            {
              "value": "long text",
              "type": "string"
            },
            {
              "value": "41.12,-71.34",
              "type": "string"
            },
            {
              "value": "2017-05-25 11:22:33",
              "type": "string"
            }
          ],
          "sliceRecordCount": 100
        }
      },
      "writer": {
        "name": "elasticsearchwriter",
        "parameter": {
          "endpoint": "http://localhost:9200",
          "index": "test-1",
          "type": "default",
          "cleanup": true,
          "settings": {
            "index": {
              "number_of_shards": 1,
              "number_of_replicas": 0
            }
          },
          "discovery": false,
          "batchSize": 1000,
          "splitter": ",",
          "column": [
            {
              "name": "pk",
              "type": "id"
            },
            {
              "name": "col_ip",
              "type": "ip"
            },
            {
              "name": "col_double",
              "type": "double"
            },
            {
              "name": "col_long",
              "type": "long"
            },
            {
              "name": "col_integer",
              "type": "integer"
            },
            {
              "name": "col_keyword",
              "type": "keyword"
            },
            {
              "name": "col_text",
              "type": "text",
              "analyzer": "ik_max_word"
            },
            {
              "name": "col_geo_point",
              "type": "geo_point"
            },
            {
              "name": "col_date",
              "type": "date",
              "format": "yyyy-MM-dd HH:mm:ss"
            },
            {
              "name": "col_nested1",
              "type": "nested"
            },
            {
              "name": "col_nested2",
              "type": "nested"
            },
            {
              "name": "col_object1",
              "type": "object"
            },
            {
              "name": "col_object2",
              "type": "object"
            },
            {
              "name": "col_integer_array",
              "type": "integer",
              "array": true
            },
            {
              "name": "col_geo_shape",
              "type": "geo_shape",
              "tree": "quadtree",
              "precision": "10m"
            }
          ]
        }
      }
    }
  }
}

Parameters

ConfigurationRequiredData TypeDefault ValueDescription
endpointYesstringNoneElasticSearch connection address, if cluster, multiple addresses separated by comma (,)
accessIdNostringEmptyUser in http auth, default is empty
accessKeyNostringEmptyPassword in http auth
indexYesstringNoneIndex name
typeNostringdefaultIndex type
cleanupNobooleanfalseWhether to delete original table
batchSizeNoint1000Number of records in each batch
trySizeNoint30Number of retries after failure
timeoutNoint600000Client timeout in milliseconds (ms)
discoveryNobooleanfalseEnable node discovery (polling) and periodically update server list in client
compressionNobooleantrueWhether to enable http request compression
multiThreadNobooleantrueWhether to enable multi-threaded http requests
ignoreWriteErrorNobooleanfalseWhether to retry on write error, if true means always retry, otherwise ignore the record
ignoreParseErrorNobooleantrueWhether to continue writing when data format parsing error occurs
aliasNostringNoneAlias to write after data import is completed
aliasModeNostringappendMode for adding alias after data import completion, append (add mode), exclusive (keep only this one)
settingsNomapNoneSettings when creating index, same as elasticsearch official
splitterNostring,If inserted data is array, use specified delimiter
columnYeslist<map>NoneField types, the example in the document includes all supported field types
dynamicNobooleanfalseDon't use addax mappings, use es's own automatic mappings

Constraints

  • If importing id, data import failures will also retry, re-import will only overwrite, ensuring data consistency
  • If not importing id, it's append_only mode, elasticsearch automatically generates id, speed will improve about 20%, but data cannot be repaired, suitable for log-type data (low precision requirements)