Skip to content

Data Transformation

Transformer Definition

During data synchronization and transmission, users may have customized requirements for data processing, such as trimming columns or transforming column values. This can be achieved through the T (Transformer) process in ETL. Addax includes a Transformer module that allows for flexible data transformation by defining a series of UDFs (User-Defined Functions).

Execution Model

mermaid
graph LR
source(("source"))
subgraph fr["Addax Framework"]
direction LR
Reader ==> Transformer ==>Writer
end
target(("target"))
source ==> fr ==> target

UDF Functions

dx_substr

dx_substr(idx, pos, length) -> str

Parameters

  • idx: The index of the field in the record.
  • pos: The starting position within the field's value.
  • length: The length of the target substring.

Returns: A substring of the specified length from the specified starting position (inclusive). An exception is thrown if the starting position is invalid. If the field is null, it is returned directly (i.e., this transformer does not process it).

dx_pad

dx_pad(idx, flag, length, chr)

Parameters

  • idx: The index of the field in the record.
  • flag: "l" or "r", indicating whether to pad at the beginning (left) or the end (right).
  • length: The target length of the field.
  • chr: The character to use for padding.

Returns: If the source string's length is less than the target length, it returns the string after padding. If it's longer, it is truncated (always from the right). If the field is null, it is converted to an empty string before padding.

Examples:

  • dx_pad(1, "l", "4", "A"): If column 1's value is xyz, the transformed value is Axyz. If the value is xyzzzzz, it becomes xyzz.
  • dx_pad(1, "r", "4", "A"): If column 1's value is xyz, the transformed value is xyzA. If the value is xyzzzzz, it becomes xyzz.

dx_replace

dx_replace(idx, pos, length, str) -> str

Parameters

  • idx: The index of the field in the record.
  • pos: The starting position within the field's value.
  • length: The length of the substring to be replaced.
  • str: The string to replace with.

Returns: Replaces a substring of a specified length from a specified starting position (inclusive). An exception is thrown if the starting position is invalid. If the field is null, it is returned directly (i.e., this transformer does not process it).

Examples:

  • dx_replace(1, "2", "4", "****"): If column 1's value is addaxTest, it is transformed to da****est.
  • dx_replace(1, "5", "10", "****"): If column 1's value is addaxTest, it is transformed to data****.

dx_filter

dx_filter(idx, operator, expr) -> str

Parameters:

  • idx: The index of the field in the record.
  • operator: The operator. Supported operators are like, not like, >, =, <, >=, !=, <=.
  • expr: A regular expression (Java-style) or a value.

Returns:

  • If the condition is met, it returns null, which filters out the entire row. If the condition is not met, the row is kept.
  • like and not like: The field is converted to a string and then fully matched against the target regular expression.
  • >, =, <, >=, !=, <=: Comparison is performed based on the data type. Numeric types are compared by value; string and boolean types are compared lexicographically.
  • If the target field is null, it will satisfy the = null filter condition and be filtered out. For the != null condition, null does not satisfy the filter condition and is not filtered. For like, if the field is null, it is not filtered.

Examples:

  • dx_filter(1, "like", "dataTest")
  • dx_filter(1, ">=", "10")

Compound filters (i.e., conditions involving multiple fields) are not currently supported as the function parameters would be too complex for users.

dx_groovy

dx_groovy(code, package) -> record

Parameters

  • code: Code that conforms to Groovy syntax.
  • package: extraPackage, which can be a list or empty.

Returns

A Record data type.

Notes:

  • dx_groovy can only be called once per transformer configuration. Multiple calls are not allowed.
  • The groovy code supports packages from java.lang and java.util. Objects that can be directly referenced include record and various column types under element (BoolColumn.class, BytesColumn.class, DateColumn.class, DoubleColumn.class, LongColumn.class, StringColumn.class). Other packages are not supported by default. If you need to use other packages, you can set extraPackage. Note that extraPackage does not support third-party JARs.
  • In the groovy code, you must return the updated Record (e.g., record.setColumn(columnIndex, new StringColumn(newValue));) or null. Returning null filters out the current row.
  • You can directly call static utility methods (GroovyTransformerStaticUtil).

Examples:

Groovy implementation of subStr:

java
String code="Column column = record.getColumn(1);\n"+
" String oriValue = column.asString();\n"+
" String newValue = oriValue.substring(0, 3);\n"+
" record.setColumn(1, new StringColumn(newValue));\n"+
" return record;";
dx_groovy(code); // Note: The original doc had `dx_groovy(record)` which is incorrect. It should be the code string.

Groovy implementation of replace:

java
String code2="Column column = record.getColumn(1);\n"+
" String oriValue = column.asString();\n"+
" String newValue = \"\*\*\*\*\" + oriValue.substring(3, oriValue.length());\n"+
" record.setColumn(1, new StringColumn(newValue));\n"+
" return record;";

Groovy implementation of pad:

java
String code3="Column column = record.getColumn(1);\n"+
" String oriValue = column.asString();\n"+
" String padString = \"12345\";\n"+
" String finalPad = \"\";\n"+
" int NeedLength = 8 - oriValue.length();\n"+
" while (NeedLength > 0) {\n"+
"\n"+
" if (NeedLength >= padString.length()) {\n"+
" finalPad += padString;\n"+
" NeedLength -= padString.length();\n"+
" } else {\n"+
" finalPad += padString.substring(0, NeedLength);\n"+
" NeedLength = 0;\n"+
" }\n"+
" }\n"+
" String newValue= finalPad + oriValue;\n"+
" record.setColumn(1, new StringColumn(newValue));\n"+
" return record;";

Starting from version 4.1.2, dx_groovy supports loading Groovy code from an external file. The file is read relative to the $ADDAX_HOME directory, which is the installation directory of Addax.

For example, to implement subStr, you can create a file job/substr.groovy with the following content:

groovy
Column column = record.getColumn(1)
String oriValue = column.asString()
String newValue = oriValue.substring(0, 3)
record.setColumn(1, new StringColumn(newValue))
return record

Then, define it in the job file like this:

json
{
  "transformer": [
    {
      "name": "dx_groovy",
      "parameter": {
        "codeFile": "job/substr.groovy"
      }
    }
  ]
}

You can also specify an absolute path for the file.

Job Definition

In this example, four UDFs are configured.

json
{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      }
    },
    "content": {
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "value": "My name is xxxx",
              "type": "string"
            },
            {
              "value": "password is Passw0rd",
              "type": "string"
            },
            {
              "value": 19890604,
              "type": "long"
            },
            {
              "value": "1989-06-04 00:00:00",
              "type": "date"
            },
            {
              "value": true,
              "type": "bool"
            },
            {
              "value": "test",
              "type": "bytes"
            },
            {
              "random": "0,10",
              "type": "long"
            }
          ],
          "sliceRecordCount": 10
        }
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true,
          "encoding": "UTF-8"
        }
      },
      "transformer": [
        {
          "name": "dx_replace",
          "parameter": {
            "columnIndex": 0,
            "paras": [
              "11",
              "6",
              "wgzhao"
            ]
          }
        },
        {
          "name": "dx_substr",
          "parameter": {
            "columnIndex": 1,
            "paras": [
              "0",
              "12"
            ]
          }
        },
        {
          "name": "dx_map",
          "parameter": {
            "columnIndex": 2,
            "paras": [
              "^",
              "2"
            ]
          }
        },
        {
          "name": "dx_filter",
          "parameter": {
            "columnIndex": 6,
            "paras": [
              "<",
              "5"
            ]
          }
        }
      ]
    }
  }
}

Custom Functions

If the built-in functions do not meet your data transformation requirements, you can write code that conforms to Groovy specifications within the transformer. Here is a complete example:

json
{
  "job": {
    "setting": {
      "speed": {
        "byte": -1,
        "channel": 1
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": {
      "reader": {
        "name": "streamreader",
        "parameter": {
          "column": [
            {
              "value": "Addax",
              "type": "string"
            },
            {
              "incr": "1",
              "type": "long"
            },
            {
              "incr": "1989/06/04 00:00:01,-1",
              "type": "date",
              "dateFormat": "yyyy/MM/dd hh:mm:ss"
            },
            {
              "value": true,
              "type": "bool"
            },
            {
              "value": "test",
              "type": "bytes"
            }
          ],
          "sliceRecordCount": 10
        }
      },
      "writer": {
        "name": "streamwriter",
        "parameter": {
          "print": true,
          "column": [
            "col1"
          ],
          "encoding": "UTF-8"
        }
      },
      "transformer": [
        {
          "name": "dx_groovy",
          "description": "Add string 'Header_' to the first column value;Double the value of the second field",
          "parameter": {
            "code": "record.setColumn(0, new StringColumn('Header_' + record.getColumn(0).asString()));record.setColumn(1, new LongColumn(record.getColumn(1).asLong() * 2));return record;"
          }
        }
      ]
    }
  }
}

The transformer code above modifies the first two fields of each record. It adds the prefix Header_ to the first string field and doubles the value of the second integer field. The execution result is as follows:

Details
txt
$ bin/addax.sh job/transformer_demo.json 

  ___      _     _            
 / _ \    | |   | |           
/ /_\ \ __| | __| | __ ___  __
|  _  |/ _` |/ _` |/ _` \ \/ /
| | | | (_| | (_| | (_| |>  < 
\_| |_/\__,_|\__,_|\__,_/_/\_\

:: Addax version ::    (v4.0.2-SNAPSHOT)

2021-08-04 15:45:56.421 [        main] INFO  VMInfo               - VMInfo# operatingSystem class => com.sun.management.internal.OperatingSystemImpl
2021-08-04 15:45:56.443 [        main] INFO  Engine               - 

.....

2021-08-04 15:45:56.458 [        main] INFO  PerfTrace            - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-08-04 15:45:56.459 [        main] INFO  JobContainer         - Addax jobContainer starts job.
2021-08-04 15:45:56.460 [        main] INFO  JobContainer         - Set jobId = 0
2021-08-04 15:45:56.470 [       job-0] INFO  JobContainer         - Addax Reader.Job [streamreader] do prepare work .
2021-08-04 15:45:56.471 [       job-0] INFO  JobContainer         - Addax Writer.Job [streamwriter] do prepare work .
2021-08-04 15:45:56.471 [       job-0] INFO  JobContainer         - Job set Channel-Number to 1 channels.
2021-08-04 15:45:56.472 [       job-0] INFO  JobContainer         - Addax Reader.Job [streamreader] splits to [1] tasks.
2021-08-04 15:45:56.472 [       job-0] INFO  JobContainer         - Addax Writer.Job [streamwriter] splits to [1] tasks.
2021-08-04 15:45:56.498 [       job-0] INFO  JobContainer         - Scheduler starts [1] taskGroups.
2021-08-04 15:45:56.505 [ taskGroup-0] INFO  TaskGroupContainer   - taskGroupId=[0] start [1] channels for [1] tasks.
2021-08-04 15:45:56.517 [ taskGroup-0] INFO  Channel              - Channel set byte_speed_limit to -1, No bps activated.
2021-08-04 15:45:56.517 [ taskGroup-0] INFO  Channel              - Channel set record_speed_limit to -1, No tps activated.
2021-08-04 15:45:56.520 [ taskGroup-0] INFO  TransformerUtil      -  user config transformers [[dx_groovy]], loading...
2021-08-04 15:45:56.531 [ taskGroup-0] INFO  TransformerUtil      -  1 of transformer init success. name=dx_groovy, isNative=true parameter = 
  {"code":"record.setColumn(0, new StringColumn('Header_' + record.getColumn(0).asString()));record.setColumn(1, new LongColumn(record.getColumn(1).asLong() * 2));return record;"}

Header_Addax    2       1989-06-04 00:00:01     true    test
Header_Addax    4       1989-06-03 00:00:01     true    test
Header_Addax    6       1989-06-02 00:00:01     true    test
Header_Addax    8       1989-06-01 00:00:01     true    test
Header_Addax    10      1989-05-31 00:00:01     true    test
Header_Addax    12      1989-05-30 00:00:01     true    test
Header_Addax    14      1989-05-29 00:00:01     true    test
Header_Addax    16      1989-05-28 00:00:01     true    test
Header_Addax    18      1989-05-27 00:00:01     true    test
Header_Addax    20      1989-05-26 00:00:01     true    test

2021-08-04 15:45:59.515 [       job-0] INFO  AbstractScheduler    - Scheduler accomplished all tasks.
2021-08-04 15:45:59.517 [       job-0] INFO  JobContainer         - Addax Writer.Job [streamwriter] do post work.
2021-08-04 15:45:59.518 [       job-0] INFO  JobContainer         - Addax Reader.Job [streamreader] do post work.
2021-08-04 15:45:59.521 [       job-0] INFO  JobContainer         - PerfTrace not enable!
2021-08-04 15:45:59.524 [       job-0] INFO  StandAloneJobContainerCommunicator - Total 10 records, 330 bytes | Speed 110B/s, 3 records/s | Error 0 records, 0 bytes |  
  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Transformer Success 10 records | Transformer Error 0 records | Transformer Filter 0 records 
  | Transformer usedTime 0.383s | Percentage 100.00%
2021-08-04 15:45:59.527 [       job-0] INFO  JobContainer         - 
任务启动时刻                    : 2021-08-04 15:45:56
任务结束时刻                    : 2021-08-04 15:45:59
任务总计耗时                    :                  3s
任务平均流量                    :              110B/s
记录写入速度                    :              3rec/s
读出记录总数                    :                  10
读写失败总数                    :                   0

2021-08-04 15:45:59.528 [       job-0] INFO  JobContainer         - 
Transformer成功记录总数         :                  10
Transformer失败记录总数         :                   0
Transformer过滤记录总数         :                   0

Metrics and Dirty Data

The Transform process involves data conversion, which may increase or decrease the amount of data. Therefore, precise metrics are needed, including:

  • Number of input records and bytes for the Transform.
  • Number of output records and bytes from the Transform.
  • Number of dirty data records and bytes from the Transform.
  • If there are multiple Transforms, and one of them generates dirty data, subsequent transforms will not be executed for that record, and it will be directly counted as dirty data.
  • Currently, only overall metrics for all Transforms are provided (success, failure, filtered counts, and time consumed by the transform).

The metrics displayed during the process are defined as follows:

shell
Total 1000000 records, 22000000 bytes | Transform 100000 records(in), 10000 records(out) | Speed 2.10MB/s, 100000 records/s | Error 0 records, 0 bytes | Percentage 100.00%

INFO

This mainly records the input and output of the transformation, which requires monitoring changes in the number of data records.

The final job metrics are displayed as follows:

shell
Job start  at             : 2025-07-23 09:08:26
Job end    at             : 2025-07-23 09:08:29
Job took secs             :                  3s
Average   bps             :              110B/s
Average   rps             :              3rec/s
Number of rec             :                  10
Failed record             :                   0
Transformer success records:                  10
Transformer failed  records:                   0
Transformer filter  records:                   0

Note: This mainly records the input and output of the transformation, which requires monitoring changes in the number of data records.