Skip to content

MongoDB Writer

MongoDB Writer 插件用于向 MongoDB 写入数据。

配置样例

该示例将流式数据写入到 MongoDB 表中

json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1,
        "bytes": -1
      }
    },
    "content": {
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "value": "unique_id",
              "type": "string"
            },
            {
              "value": "sid",
              "type": "string"
            },
            {
              "value": "user_id",
              "type": "string"
            },
            {
              "value": "auction_id",
              "type": "string"
            },
            {
              "value": "content_type",
              "type": "string"
            },
            {
              "value": "pool_type",
              "type": "string"
            },
            {
              "value": "a1 a2 a3",
              "type": "string"
            },
            {
              "value": "c1 c2 c3",
              "type": "string"
            },
            {
              "value": "2020-09-06",
              "type": "string"
            },
            {
              "value": "tag1 tag2 tag3",
              "type": "string"
            },
            {
              "value": "property",
              "type": "string"
            },
            {
              "value": 1984,
              "type": "long"
            },
            {
              "value": 1900,
              "type": "long"
            },
            {
              "value": 75,
              "type": "long"
            }
          ],
          "sliceRecordCount": 10
        }
      },
      "writer": {
        "name": "mongodbwriter",
        "parameter": {
          "username": "my_user",
          "password": "password123",
          "column": [
            {
              "name": "unique_id",
              "type": "string"
            },
            {
              "name": "sid",
              "type": "string"
            },
            {
              "name": "user_id",
              "type": "string"
            },
            {
              "name": "auction_id",
              "type": "string"
            },
            {
              "name": "content_type",
              "type": "string"
            },
            {
              "name": "pool_type",
              "type": "string"
            },
            {
              "name": "frontcat_id",
              "type": "Array",
              "splitter": " "
            },
            {
              "name": "categoryid",
              "type": "Array",
              "splitter": " "
            },
            {
              "name": "gmt_create",
              "type": "string"
            },
            {
              "name": "taglist",
              "type": "Array",
              "splitter": " "
            },
            {
              "name": "property",
              "type": "string"
            },
            {
              "name": "scorea",
              "type": "int"
            },
            {
              "name": "scoreb",
              "type": "int"
            },
            {
              "name": "scorec",
              "type": "int"
            }
          ],
          "writeMode": "insert",
          "connection": {
            "address": [
              "127.0.0.1:27017"
            ],
            "database": "my_database",
            "collection": "addax_writer",
            "authDb": "my_database"
          }
        }
      }
    }
  }
}

参数说明

配置项是否必须类型默认值描述
addresslistMongoDB 的数据地址信息
usernamestringMongoDB 的用户名
passwordstringMongoDB 的密码
collectionstringMongoDB 的集合名
columnlist<map>/*MongoDB 的文档列名
splitterstring特殊分隔符,详见下文
writeModestringinsert指定了传输数据时更新的信息,支持 insert, update 两种
batchSizeint2048指定批次输入的数量
isUpsertboolean当设置为 true 时,表示针对相同的 upsertKey 做更新操作
upsertKeystringupsertKey 指定了没行记录的业务主键。用来做更新时使用

column

column 指定 mongo collection 的字段以及类型,如果是数组类型,还需要指定接收到的数据按照什么分割,一个 column 字段至少需要指定 name 以及 type,比如

json
{
  "column": [
    {
      "name": "user_id",
      "type": "string"
    }
  ]
}

如果是数组类型,则需要配置 splitter 来告知分隔符,类似如下:

json
{
  "column": {
    "name": "taglist",
    "type": "Array",
    "splitter": " "
  }
}

也可以支持配置 "*" 来表示所有字段,比如:

json
{
  "column": ["*"]
}

在这种情况下,插件会把一条记录当做一个完整的 MongoDB 文档来处理,然后转为 BSON 格式存储到 MongoDB 中

splitter

当且仅当要处理的字符串要用分隔符分隔为字符数组时,才使用这个参数,通过这个参数指定的分隔符,将字符串分隔存储到 MongoDB 的数组中

writeMode

不配置的情况下,默认采取直接插入记录的方式,如果希望实现插入更新(即记录存在则更新否则插入),可以指定为 update 模式,该模式下,必须同时更新的字段是哪个,比如:

json
{
  "writeMode": "update(unique_id)"
}

上述配置表示依据字段 unique_id 来决定当前记录是插入还是更新,当前暂不支持指定多个字段

类型转换

Addax 内部类型MongoDB 数据类型
Longint, Long
Doubledouble
Stringstring, array
Datedate
Booleanboolean
Bytesbytes