ElasticSearchReader
ElasticSearchReader plugin implements the functionality of reading indexes from Elasticsearch. It uses the Rest API provided by Elasticsearch (default port 9200) to execute specified query statements and batch retrieve data.
Example
Assume the index content to be retrieved is as follows
json
{
"took": 14,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "test-1",
"_type": "default",
"_id": "38",
"_score": 1,
"_source": {
"col_date": "2017-05-25T11:22:33.000+08:00",
"col_integer": 19890604,
"col_keyword": "hello world",
"col_ip": "1.1.1.1",
"col_text": "long text",
"col_double": 19890604,
"col_long": 19890604,
"col_geo_point": "41.12,-71.34"
}
},
{
"_index": "test-1",
"_type": "default",
"_id": "103",
"_score": 1,
"_source": {
"col_date": "2017-05-25T11:22:33.000+08:00",
"col_integer": 19890604,
"col_keyword": "hello world",
"col_ip": "1.1.1.1",
"col_text": "long text",
"col_double": 19890604,
"col_long": 19890604,
"col_geo_point": "41.12,-71.34"
}
}
]
}
}Configure a task to read data from Elasticsearch and print to terminal
json
{
"job": {
"setting": {
"speed": {
"byte": -1,
"channel": 1
}
},
"content": {
"reader": {
"name": "elasticsearchreader",
"parameter": {
"endpoint": "http://127.0.0.1:9200",
"accessId": "",
"accesskey": "",
"index": "test-1",
"type": "default",
"searchType": "dfs_query_then_fetch",
"headers": {},
"scroll": "3m",
"search": [
{
"query": {
"match": {
"col_ip": "1.1.1.1"
}
},
"aggregations": {
"top_10_states": {
"terms": {
"field": "col_date",
"size": 10
}
}
}
}
],
"column": [
"col_ip",
"col_double",
"col_long",
"col_integer",
"col_keyword",
"col_text",
"col_geo_point",
"col_date"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
}
}Save the above content as job/es2stream.json
Execute the following command for collection
bash
bin/addax.sh job/es2stream.jsonThe output result is similar to the following (output records are reduced):
txt
2021-02-19 13:38:15.860 [main] INFO VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-02-19 13:38:15.895 [main] INFO Engine -
{
"content":
{
"reader":{
"parameter":{
"accessId":"",
"headers":{},
"endpoint":"http://127.0.0.1:9200",
"search":[
{
"query": {
"match": {
"col_ip": "1.1.1.1"
}
},
"aggregations": {
"top_10_states": {
"terms": {
"field": "col_date",
"size": 10
}
}
}
}
],
"accesskey":"*****",
"searchType":"dfs_query_then_fetch",
"scroll":"3m",
"column":[
"col_ip",
"col_double",
"col_long",
"col_integer",
"col_keyword",
"col_text",
"col_geo_point",
"col_date"
],
"index":"test-1",
"type":"default"
},
"name":"elasticsearchreader"
},
"writer":{
"parameter":{
"print":true,
"encoding":"UTF-8"
},
"name":"streamwriter"
}
},
"setting":{
"errorLimit":{
"record":0,
"percentage":0.02
},
"speed":{
"byte":-1,
"channel":1
}
}
}
2021-02-19 13:38:15.934 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-19 13:38:15.934 [main] INFO JobContainer - Addax jobContainer starts job.
2021-02-19 13:38:15.937 [main] INFO JobContainer - Set jobId = 0
2017-05-25T11:22:33.000+08:00 19890604 hello world 1.1.1.1 long text 19890604 19890604 41.12,-71.34
2017-05-25T11:22:33.000+08:00 19890604 hello world 1.1.1.1 long text 19890604 19890604 41.12,-71.34
2021-02-19 13:38:19.845 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-02-19 13:38:19.848 [job-0] INFO JobContainer - Addax Writer.Job [streamwriter] do post work.
2021-02-19 13:38:19.849 [job-0] INFO JobContainer - Addax Reader.Job [elasticsearchreader] do post work.
2021-02-19 13:38:19.855 [job-0] INFO JobContainer - PerfTrace not enable!
2021-02-19 13:38:19.858 [job-0] INFO StandAloneJobContainerCommunicator - Total 95 records, 8740 bytes | Speed 2.84KB/s, 31 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.103s | Percentage 100.00%
2021-02-19 13:38:19.861 [job-0] INFO JobContainer -
任务启动时刻 : 2021-02-19 13:38:15
任务结束时刻 : 2021-02-19 13:38:19
任务总计耗时 : 3s
任务平均流量 : 2.84KB/s
记录写入速度 : 31rec/s
读出记录总数 : 2
读写失败总数 : 0Parameters
| Configuration | Required | Type | Default Value | Description |
|---|---|---|---|---|
| endpoint | Yes | string | None | ElasticSearch connection address |
| accessId | No | string | "" | User in http auth |
| accessKey | No | string | "" | Password in http auth |
| index | Yes | string | None | Index name in elasticsearch |
| type | No | string | index name | Type name of index in elasticsearch |
| search | Yes | list | [] | JSON format API search data body |
| column | Yes | list | None | Fields to be read |
| timeout | No | int | 60 | Client timeout (unit: seconds) |
| discovery | No | boolean | false | Enable node discovery (polling) and periodically update server list in client |
| compression | No | boolean | true | HTTP request, enable compression |
| multiThread | No | boolean | true | HTTP request, whether multi-threaded |
| searchType | No | string | dfs_query_then_fetch | Search type |
| headers | No | map | {} | HTTP request headers |
| scroll | No | string | "" | Scroll pagination configuration |
search
The search configuration item allows configuration of content that meets Elasticsearch API query requirements, like this:
json
{
"query": {
"match": {
"message": "myProduct"
}
},
"aggregations": {
"top_10_states": {
"terms": {
"field": "state",
"size": 10
}
}
}
}searchType
searchType currently supports the following types:
- dfs_query_then_fetch
- query_then_fetch
- count
- scan