Skip to content

ElasticSearchReader

ElasticSearchReader plugin implements the functionality of reading indexes from Elasticsearch. It uses the Rest API provided by Elasticsearch (default port 9200) to execute specified query statements and batch retrieve data.

Example

Assume the index content to be retrieved is as follows

json
{
  "took": 14,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "test-1",
        "_type": "default",
        "_id": "38",
        "_score": 1,
        "_source": {
          "col_date": "2017-05-25T11:22:33.000+08:00",
          "col_integer": 19890604,
          "col_keyword": "hello world",
          "col_ip": "1.1.1.1",
          "col_text": "long text",
          "col_double": 19890604,
          "col_long": 19890604,
          "col_geo_point": "41.12,-71.34"
        }
      },
      {
        "_index": "test-1",
        "_type": "default",
        "_id": "103",
        "_score": 1,
        "_source": {
          "col_date": "2017-05-25T11:22:33.000+08:00",
          "col_integer": 19890604,
          "col_keyword": "hello world",
          "col_ip": "1.1.1.1",
          "col_text": "long text",
          "col_double": 19890604,
          "col_long": 19890604,
          "col_geo_point": "41.12,-71.34"
        }
      }
    ]
  }
}

Configure a task to read data from Elasticsearch and print to terminal

json
{
  "job": {
    "setting": {
      "speed": {
        "byte": -1,
        "channel": 1
      }
    },
    "content": {
      "reader": {
        "name": "elasticsearchreader",
        "parameter": {
          "endpoint": "http://127.0.0.1:9200",
          "accessId": "",
          "accesskey": "",
          "index": "test-1",
          "type": "default",
          "searchType": "dfs_query_then_fetch",
          "headers": {},
          "scroll": "3m",
          "search": [
            {
              "query": {
                "match": {
                  "col_ip": "1.1.1.1"
                }
              },
              "aggregations": {
                "top_10_states": {
                  "terms": {
                    "field": "col_date",
                    "size": 10
                  }
                }
              }
            }
          ],
          "column": [
            "col_ip",
            "col_double",
            "col_long",
            "col_integer",
            "col_keyword",
            "col_text",
            "col_geo_point",
            "col_date"
          ]
        }
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true,
          "encoding": "UTF-8"
        }
      }
    }
  }
}

Save the above content as job/es2stream.json

Execute the following command for collection

bash
bin/addax.sh job/es2stream.json

The output result is similar to the following (output records are reduced):

txt
2021-02-19 13:38:15.860 [main] INFO  VMInfo - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-02-19 13:38:15.895 [main] INFO  Engine -
{
	"content":
		{
			"reader":{
				"parameter":{
					"accessId":"",
					"headers":{},
					"endpoint":"http://127.0.0.1:9200",
					"search":[
                      {
                        "query": {
                          "match": {
                            "col_ip": "1.1.1.1"
                          }
                        },
                        "aggregations": {
                          "top_10_states": {
                            "terms": {
                              "field": "col_date",
                              "size": 10
                            }
                          }
                        }
                      }
					],
					"accesskey":"*****",
					"searchType":"dfs_query_then_fetch",
					"scroll":"3m",
					"column":[
						"col_ip",
						"col_double",
						"col_long",
						"col_integer",
						"col_keyword",
						"col_text",
						"col_geo_point",
						"col_date"
					],
					"index":"test-1",
					"type":"default"
				},
				"name":"elasticsearchreader"
			},
			"writer":{
				"parameter":{
					"print":true,
					"encoding":"UTF-8"
				},
				"name":"streamwriter"
			}
		},
	"setting":{
		"errorLimit":{
			"record":0,
			"percentage":0.02
		},
		"speed":{
			"byte":-1,
			"channel":1
		}
	}
}

2021-02-19 13:38:15.934 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-02-19 13:38:15.934 [main] INFO  JobContainer - Addax jobContainer starts job.
2021-02-19 13:38:15.937 [main] INFO  JobContainer - Set jobId = 0

2017-05-25T11:22:33.000+08:00	19890604	hello world	1.1.1.1	long text	19890604	19890604	41.12,-71.34
2017-05-25T11:22:33.000+08:00	19890604	hello world	1.1.1.1	long text	19890604	19890604	41.12,-71.34

2021-02-19 13:38:19.845 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2021-02-19 13:38:19.848 [job-0] INFO  JobContainer - Addax Writer.Job [streamwriter] do post work.
2021-02-19 13:38:19.849 [job-0] INFO  JobContainer - Addax Reader.Job [elasticsearchreader] do post work.
2021-02-19 13:38:19.855 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-02-19 13:38:19.858 [job-0] INFO  StandAloneJobContainerCommunicator - Total 95 records, 8740 bytes | Speed 2.84KB/s, 31 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.103s | Percentage 100.00%
2021-02-19 13:38:19.861 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-02-19 13:38:15
任务结束时刻                    : 2021-02-19 13:38:19
任务总计耗时                    :                  3s
任务平均流量                    :            2.84KB/s
记录写入速度                    :             31rec/s
读出记录总数                    :                   2
读写失败总数                    :                   0

Parameters

ConfigurationRequiredTypeDefault ValueDescription
endpointYesstringNoneElasticSearch connection address
accessIdNostring""User in http auth
accessKeyNostring""Password in http auth
indexYesstringNoneIndex name in elasticsearch
typeNostringindex nameType name of index in elasticsearch
searchYeslist[]JSON format API search data body
columnYeslistNoneFields to be read
timeoutNoint60Client timeout (unit: seconds)
discoveryNobooleanfalseEnable node discovery (polling) and periodically update server list in client
compressionNobooleantrueHTTP request, enable compression
multiThreadNobooleantrueHTTP request, whether multi-threaded
searchTypeNostringdfs_query_then_fetchSearch type
headersNomap{}HTTP request headers
scrollNostring""Scroll pagination configuration

The search configuration item allows configuration of content that meets Elasticsearch API query requirements, like this:

json
{
  "query": {
    "match": {
      "message": "myProduct"
    }
  },
  "aggregations": {
    "top_10_states": {
      "terms": {
        "field": "state",
        "size": 10
      }
    }
  }
}

searchType

searchType currently supports the following types:

  • dfs_query_then_fetch
  • query_then_fetch
  • count
  • scan