清洗脚本配置样例

Best Practice of Using ETL Script

为了方便用户使用清洗脚本,我们提供了丰富的清洗脚本使用样例。

一、DML字段标识脚本

脚本描述:源表数据发生变化时,在目的地表中增加相应的 DML 标识字段,包括 insert、update、delete。

import com.datapipeline.clients.connector.record.DpRecordMeta;
import java.util.Map;
public  class AddDMLField {
    public  static Map process(Map record, DpRecordMeta meta) {
    // 系统会向 dml 字段写入标识:insert、update、delete。
    // 如果是定时读取模式,所有 dml 字段会表示为 insert,只有通过实时模式读取数据时才会识别 update 和 delete。
    record.put("dml", meta.getType());
    // 保存该脚本后,请在目的地表结构添加新字段:dml,并把字段类型改为字符串类型(例如:varchar)
    return record;
    }
}

二、读取时间脚本

脚本描述:根据数据读取时间,在目的地表中增加相应的时间字段。

import com.datapipeline.clients.connector.record.DpRecordMeta;
import java.util.Map;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.Instant;
import java.time.format.DateTimeFormatter; 
public  class AddCollectTimeField {
    private  static DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    public Map process(Map record, DpRecordMeta meta) {
        // 系统会向 collect_time 字段写入读取该数据的时间,格式为:yyyy-MM-dd HH:mm:ss。
        record.put("collect_time", Instant.ofEpochMilli(meta.getSourceRecordTimestamp()).atZone(ZoneId.of("UTC")).format(DATE_TIME_FORMATTER));
        // 保存脚本后,在目的地表结构中添加字段:collect_time,并把字段类型改为时间类型(或字符串类型)。
        return record;
    }
}

三、写入时间脚本

脚本描述:根据数据写入时间,在目的地表中增加相应的时间字段。

四、日期类型转换脚本

脚本描述:将 Timestamp 类型数据转换为日期类型

五、Timestamp 转换脚本

脚本描述:将日期类型转换为Timestamp 类型数据。

六、JSON解析脚本

脚本描述:将 JSON 数据解析,并指定到目标表表结构。

七、数据脱敏

场景描述:用户想对表格中的信息进行脱敏,比如手机号或者身份证号。

例:身份证号脱敏,保留前四位和后四位,手机号码脱敏,保留前三位和后四位。

具体代码如下:

八、枚举类型转换

场景描述:用户想对表格中的类型进行转换。

例:把原表中表示性别的‘0’和‘1‘,到目的地表中,转换为对应的‘男’和‘女’。

具体代码如下:

九、字段求和

场景描述:用户想对表格中的几个字段的内的数据进行求和。

例:把原表中的两个字段内的薪水,到目的地表中,生成一个新的字段,其值为前两个字段的和。

具体代码如下:

十、时间类型格式统一

场景描述:用户想对表格中字段的日期格式统一。(2019115、20190115、2019.01.15)转换成 YYYY-MM-DD(2019-01-15)。

具体代码如下:

十一、库存日期结合

场景描述:用户想对表格中日期和时间,结合到一个字段内。

具体代码如下:

十二、日期间隔计算

场景描述:用户想对表格中两个日期之间相差的天数,进行计算,再写入到一个字段内。

具体代码如下:

十三、非空字段判断

场景描述:判断一个字段是否为空。

十四、指定数据进错误队列

场景描述:指定某一些数据进入错误队列。

十五、时区转换

场景描述:将上海时间转化为标准时间。

十六、字符集编码转换

场景描述:当数据源的数据为乱码时,可通过高级清洗转换字符集编码,处理乱码数据。

十七、Kafka目的地Avro格式定义

场景描述:当数据目的地节点为Kafka,且Topic序列化器为Avro时,可以通过高级清洗代码定义Avro数据结构,并将Schema信息写入Schema Registry。

最后更新于

这有帮助吗?