ElasticSearch Writer
ElasticSearch Writer plugin is used to write data to ElasticSearch. It is implemented through elasticsearch's rest api interface, writing data to elasticsearch in batches.
Configuration Example
json
{
"job": {
"setting": {
"speed": {
"channel": 1,
"bytes": -1
}
},
"content": {
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"random": "10,1000",
"type": "long"
},
{
"value": "1.1.1.1",
"type": "string"
},
{
"value": 19890604,
"type": "double"
},
{
"value": 19890604,
"type": "long"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "hello world",
"type": "string"
},
{
"value": "long text",
"type": "string"
},
{
"value": "41.12,-71.34",
"type": "string"
},
{
"value": "2017-05-25 11:22:33",
"type": "string"
}
],
"sliceRecordCount": 100
}
},
"writer": {
"name": "elasticsearchwriter",
"parameter": {
"endpoint": "http://localhost:9200",
"index": "test-1",
"type": "default",
"cleanup": true,
"settings": {
"index": {
"number_of_shards": 1,
"number_of_replicas": 0
}
},
"discovery": false,
"batchSize": 1000,
"splitter": ",",
"column": [
{
"name": "pk",
"type": "id"
},
{
"name": "col_ip",
"type": "ip"
},
{
"name": "col_double",
"type": "double"
},
{
"name": "col_long",
"type": "long"
},
{
"name": "col_integer",
"type": "integer"
},
{
"name": "col_keyword",
"type": "keyword"
},
{
"name": "col_text",
"type": "text",
"analyzer": "ik_max_word"
},
{
"name": "col_geo_point",
"type": "geo_point"
},
{
"name": "col_date",
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss"
},
{
"name": "col_nested1",
"type": "nested"
},
{
"name": "col_nested2",
"type": "nested"
},
{
"name": "col_object1",
"type": "object"
},
{
"name": "col_object2",
"type": "object"
},
{
"name": "col_integer_array",
"type": "integer",
"array": true
},
{
"name": "col_geo_shape",
"type": "geo_shape",
"tree": "quadtree",
"precision": "10m"
}
]
}
}
}
}
}Parameters
| Configuration | Required | Data Type | Default Value | Description |
|---|---|---|---|---|
| endpoint | Yes | string | None | ElasticSearch connection address, if cluster, multiple addresses separated by comma (,) |
| accessId | No | string | Empty | User in http auth, default is empty |
| accessKey | No | string | Empty | Password in http auth |
| index | Yes | string | None | Index name |
| type | No | string | default | Index type |
| cleanup | No | boolean | false | Whether to delete original table |
| batchSize | No | int | 1000 | Number of records in each batch |
| trySize | No | int | 30 | Number of retries after failure |
| timeout | No | int | 600000 | Client timeout in milliseconds (ms) |
| discovery | No | boolean | false | Enable node discovery (polling) and periodically update server list in client |
| compression | No | boolean | true | Whether to enable http request compression |
| multiThread | No | boolean | true | Whether to enable multi-threaded http requests |
| ignoreWriteError | No | boolean | false | Whether to retry on write error, if true means always retry, otherwise ignore the record |
| ignoreParseError | No | boolean | true | Whether to continue writing when data format parsing error occurs |
| alias | No | string | None | Alias to write after data import is completed |
| aliasMode | No | string | append | Mode for adding alias after data import completion, append (add mode), exclusive (keep only this one) |
| settings | No | map | None | Settings when creating index, same as elasticsearch official |
| splitter | No | string | , | If inserted data is array, use specified delimiter |
| column | Yes | list<map> | None | Field types, the example in the document includes all supported field types |
| dynamic | No | boolean | false | Don't use addax mappings, use es's own automatic mappings |
Constraints
- If importing id, data import failures will also retry, re-import will only overwrite, ensuring data consistency
- If not importing id, it's append_only mode, elasticsearch automatically generates id, speed will improve about 20%, but data cannot be repaired, suitable for log-type data (low precision requirements)