清洗脚本配置样例
Best Practice of Using ETL Script
为了方便用户使用清洗脚本,我们提供了丰富的清洗脚本使用样例。
一、DML字段标识脚本
脚本描述:源表数据发生变化时,在目的地表中增加相应的 DML 标识字段,包括 insert、update、delete。
import com.datapipeline.clients.connector.record.DpRecordMeta;
import java.util.Map;
public class AddDMLField {
public static Map process(Map record, DpRecordMeta meta) {
// 系统会向 dml 字段写入标识:insert、update、delete。
// 如果是定时读取模式,所有 dml 字段会表示为 insert,只有通过实时模式读取数据时才会识别 update 和 delete。
record.put("dml", meta.getType());
// 保存该脚本后,请在目的地表结构添加新字段:dml,并把字段类型改为字符串类型(例如:varchar)
return record;
}
}
二、读取时间脚本
脚本描述:根据数据读取时间,在目的地表中增加相应的时间字段。
import com.datapipeline.clients.connector.record.DpRecordMeta;
import java.util.Map;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
public class AddCollectTimeField {
private static DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public Map process(Map record, DpRecordMeta meta) {
// 系统会向 collect_time 字段写入读取该数据的时间,格式为:yyyy-MM-dd HH:mm:ss。
record.put("collect_time", Instant.ofEpochMilli(meta.getSourceRecordTimestamp()).atZone(ZoneId.of("UTC")).format(DATE_TIME_FORMATTER));
// 保存脚本后,在目的地表结构中添加字段:collect_time,并把字段类型改为时间类型(或字符串类型)。
return record;
}
}
三、写入时间脚本
脚本描述:根据数据写入时间,在目的地表中增加相应的时间字段。
import java.util.Map;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
public class AddUpdateTimeField {
private static DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public Map process(Map record) {
// 系统会向 update_time 字段写入写入目的地的时间,格式为:yyyy-MM-dd HH:mm:ss。
record.put("update_time", LocalDateTime.now(ZoneId.of("Asia/Shanghai")).format(DATE_TIME_FORMATTER));
// 保存脚本后,在目的地表结构中添加字段:update_time,并把字段类型改为时间类型(或字符串类型)。
return record;
}
}
四、日期类型转换脚本
脚本描述:将 Timestamp 类型数据转换为日期类型
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
public class TimestampProcess {
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
public Map process(Map record) {
// timestamp_type 类型为字符串,格式为:2019-07-24 17:06:54.000
final String timestampStr = (String) record.get("timestamp_type");
if (timestampStr != null) {
try {
// 将字符串转换为日期对象
final LocalDateTime localDateTime = LocalDateTime.parse(timestampStr, TIMESTAMP_FORMATTER);
// 将时间对象转换为 DATE_FORMATTER 格式的字符串,转化之后的字符串为:2019-07-24
record.put("formatedTime", DATE_FORMATTER.format(localDateTime));
} catch (Exception e) {
e.printStackTrace();
}
}
return record;
}
}
五、Timestamp 转换脚本
脚本描述:将日期类型转换为Timestamp 类型数据。
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
public class Date2TimeStampSample {
public Map<String, Object> process(Map<String, Object> record) throws Exception{
// date = "20180101 12:0:22.123"
String date = record.get("date");
long timeStamp = ZonedDateTime.parse(date, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("Asia/Shanghai"))).toInstant().toEpochMilli();
record.put("ts", timeStamp);
return record;
}
}
六、JSON解析脚本
脚本描述:将 JSON 数据解析,并指定到目标表表结构。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class Test_api_url_nameTransformEngine {
public List<Map<String, Object>> process(Map<String, Object> record) {
List<Map<String, Object>> records = new ArrayList<>();
if (!Objects.isNull(record)) {
String jsonStr = (String) record.get("content");
JSONObject jsonNode = JSON.parseObject(jsonStr);
JSONArray items = jsonNode.getJSONObject("reponseData").getJSONArray("data");
for (Object iterm : items) {
JSONObject eachIterm = (JSONObject) iterm;
String title = String.valueOf(eachIterm.get("title"));
if (title != null && title.length() > 100) {
title = title.substring(0, 100);
}
Map<String, Object> eachRowData = new HashMap<>();
eachRowData.put("_id", String.valueOf(eachIterm.get("id")));
eachRowData.put("tab", String.valueOf(eachIterm.get("tab")));
eachRowData.put("title", title);
records.add(eachRowData);
}
}
return records;
}
}
七、数据脱敏
场景描述:用户想对表格中的信息进行脱敏,比如手机号或者身份证号。
例:身份证号脱敏,保留前四位和后四位,手机号码脱敏,保留前三位和后四位。
具体代码如下:
import java.util.Map;
public class Sid_2382_05b0f75d2382f96a23824d7023829c612382ef7ac208785d {
public Map process(Map record) {
record.put("id",((String)record.get("id")).replaceAll("(\\d{4})\\d{10}(\\d{4})","$1**********$2"));
record.put("phone",((String)record.get("phone")).replaceAll("(\\d{3})\\d{4}(\\d{4})","$1****$2"));
return record;
}
}
八、枚举类型转换
场景描述:用户想对表格中的类型进行转换。
例:把原表中表示性别的‘0’和‘1‘,到目的地表中,转换为对应的‘男’和‘女’。
具体代码如下:
import java.util.Map;
public class Sid_2383_a8984a582383944223834f332383b7e52383aa115c629f93 {
public Map process(Map record) {
if (record.get("sex").equals("0"))
{record.put("sex","男");}
else if(record.get("sex").equals("1"))
{record.put("sex","女");}
else
{record.put("sex","");}
return record;
}
}
九、字段求和
场景描述:用户想对表格中的几个字段的内的数据进行求和。
例:把原表中的两个字段内的薪水,到目的地表中,生成一个新的字段,其值为前两个字段的和。
具体代码如下:
import java.util.Map;
public class Sid_2384_f049d91a2384daea23844a3a23848fd923847cc3bc23dc6e {
public Map<String, Object>process(Map<String, Object>record) {
record.put("sum",(Double) record.get("salary1")+(Double) record.get("salary2"));
return record;
}
}
十、时间类型格式统一
场景描述:用户想对表格中字段的日期格式统一。(2019115、20190115、2019.01.15)转换成 YYYY-MM-DD(2019-01-15)。
具体代码如下:
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class ConventDateSample {
public Map<String, Object> process(Map<String, Object> record) throws Exception{
try{
SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd");
Date datetime=df1.parse(record.get("datetime").toString());
String newDatetime = df2.format(datetime);
record.put("datetime",newDatetime);
}catch(Exception e){
}
return record;
}
}
十一、库存日期结合
场景描述:用户想对表格中日期和时间,结合到一个字段内。
具体代码如下:
import java.util.Map;
import java.util.Date;
import java.text.ParseException;
import java.text.SimpleDateFormat;
public class Sid_2516_8ffcc7ca2516201f25164ee7251690f3251641b41d3bbd5c {
public Map<String, Object>process(Map<String, Object>record) {
record.put("datetime2", record.get("date")+" "+ record.get("time"));
return record;
}
}
十二、日期间隔计算
场景描述:用户想对表格中两个日期之间相差的天数,进行计算,再写入到一个字段内。
具体代码如下:
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class Sid_2519_5017952225194a2425194ff5251980f3251985af080e3b28 {
public Map<String, Object>process(Map<String, Object> record) throws Exception{
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date start=df2.parse(record.get("startTime").toString());
Date end=df2.parse(record.get("endTine").toString());
try{
int days= (int) ((df.parse(df.format(end)).getTime() -df.parse(df.format(start)).getTime()) / (24 * 60 * 60 * 1000));
record.put("days",days);
}catch(Exception e){
}
return record;
}
}
十三、非空字段判断
场景描述:判断一个字段是否为空。
import java.util.Map;
import java.util.Objects;
import com.datapipeline.clients.utils.CodeEngineUtils;
public class Example13 {
public Map process(Map record)
final Object orderno = record.get("orderno");
boolean ordernoIsNull = CodeEngineUtils.isNull(orderno);
}
十四、指定数据进错误队列
场景描述:指定某一些数据进入错误队列。
import java.util.Map;
import java.util.Objects;
import java.lang.String;
import java.lang.Throwable;
import com.datapipeline.clients.codeengine.CustomizedCodeEngineException;
public class Example14 {
public Map process(Map record) throws CustomizedCodeEngineException {
if(Integer.parseInt(record.get("age").toString()) <= 20){
throw new CustomizedCodeEngineException(new Exception("Illegal data!"));
}
return record;
}
}
十五、时区转换
场景描述:将上海时间转化为标准时间。
import java.util.Map;
import java.time.*;
import java.time.format.DateTimeFormatter;
public class localTime2standardTime {
public Map<String, Object> process(Map<String, Object> record) {
String crt = (String)record.get("create_time");
try {
long ncrt = ZonedDateTime.parse(crt,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("Asia/Shanghai"))).toInstant().toEpochMilli();
record.put("timestamp",ncrt);
}
catch (Exception e) {
e.printStackTrace();
}
return record;
}
}
十六、字符集编码转换
场景描述:当数据源的数据为乱码时,可通过高级清洗转换字符集编码,处理乱码数据。
package com.dp.exltcsv;
import java.io.UnsupportedEncodingException;
import java.util.Map;
public class utf2gbk {
public Map<String, Object> process(Map<String, Object> record) throws UnsupportedEncodingException {
// 针对单个字段的格式
// String key = (String)record.get("key");
// record.put(key,(Object)new String(key.getBytes("utf-8"),"gbk"));
// return record;
// 针对所有字段
for (String key : record.keySet()) {
String value = (String)record.get(key);
record.put(key,(Object)new String( value.getBytes("utf-8"), "gbk"));
}
return record;
}
}
十七、Kafka目的地Avro格式定义
场景描述:当数据目的地节点为Kafka,且Topic序列化器为Avro时,可以通过高级清洗代码定义Avro数据结构,并将Schema信息写入Schema Registry。
import java.util.Map;
import com.datapipeline.clients.codeengine.CustomizedCodeEngineException;
import com.datapipeline.clients.connector.schema.base.SinkColumn;
import com.datapipeline.clients.connector.record.DpRecordMeta;
import com.datapipeline.clients.connector.schema.base.SinkSchema;
import com.datapipeline.internal.bean.node.column.DataType;
import com.datapipeline.internal.bean.node.column.kafka.KafkaType;
import com.datapipeline.clients.connector.schema.base.DpRecordKey;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.codehaus.jettison.json.JSONObject;
/**
* 详情请见产品手册:
*
* @see https://www.kancloud.cn/datapipeline-group/v3_7_0/1475482
*/
public class AvroCodeEngine {
private List<String> valueSyncColumnNames;
private List<String> keySyncColumnNames;
private Schema valueSchema;
private Schema keySchema;
public AvroCodeEngine() {}
public Struct getKeyStruct(DpRecordKey dpRecordKey,
SinkSchema sinkSchema,
JSONObject dataJson){
List<String> pkNames = dpRecordKey.getPrimaryKeyNames();
if (!Objects.equals(keySyncColumnNames, sinkSchema.getSyncColumnNames())) {
keySyncColumnNames = sinkSchema.getSyncColumnNames();
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name("key");
for (SinkColumn sinkColumn : sinkSchema.getSyncColumns()) {
if (pkNames.contains(sinkColumn.getName())) {
keySchemaBuilder.field(
sinkColumn.getName(),
convertDefinitionToSchema(sinkColumn.getDefinition().getType()));
}
}
keySchema = keySchemaBuilder.build();
}
Struct struct = new Struct(keySchema);
for (SinkColumn sinkColumn : sinkSchema.getSyncColumns()) {
String columnName = sinkColumn.getName();
if (pkNames.contains(columnName)) {
struct.put(
columnName,
convertValue(keySchema.field(columnName).schema(), dataJson.opt(columnName)));
}
}
return struct;
}
public Map<String, Object> process(Map<String, Object> record, DpRecordMeta meta) {
// 系统会向 dml 字段写入标识:insert、update、delete。
// 如果是定时读取模式,存量 dml 字段会标识为 insert,增量 dml 字段会标识为 update,
// 如果是实时读取模式,存量 dml 字段会标识为 insert,增量 dml 字段会标识为 update 和 delete。
Map<String,String> customParams = meta.getCustomParams();
if(customParams!=null){
record.put("enttyp",customParams.get("ENTTYP"));
}
record.put("op", meta.getType());
// 保存该脚本后,请在目的地表结构添加新字段:dml,并把字段类型改为字符串类型(例如:varchar)
return record;
}
public Struct getValueStruct(SinkSchema sinkSchema, JSONObject dataJson)
throws CustomizedCodeEngineException {
if (!Objects.equals(valueSyncColumnNames, sinkSchema.getSyncColumnNames())) {
valueSyncColumnNames = sinkSchema.getSyncColumnNames();
SchemaBuilder messageSchema = SchemaBuilder.struct().name("message");
for (SinkColumn sinkColumn : sinkSchema.getSyncColumns()) {
messageSchema.field(
sinkColumn.getName(), convertDefinitionToSchema(sinkColumn.getDefinition().getType()));
}
valueSchema =
SchemaBuilder.struct()
.name("DBMessage")
.field("op", Schema.OPTIONAL_STRING_SCHEMA)
.field("enttyp",Schema.OPTIONAL_STRING_SCHEMA)
.field("message", messageSchema.build())
.build();
}
Struct struct = new Struct(valueSchema);
Schema messageSchema = valueSchema.field("message").schema();
Struct messageStruct = new Struct(messageSchema);
for (SinkColumn sinkColumn : sinkSchema.getSyncColumns()) {
String columnName = sinkColumn.getName();
messageStruct.put(
columnName,
convertValue(messageSchema.field(columnName).schema(), dataJson.opt(columnName)));
}
struct.put("op", dataJson.optString("op"));
struct.put("enttyp",dataJson.optString("enttyp"));
struct.put("message", messageStruct);
return struct;
}
private Schema convertDefinitionToSchema(DataType dataType) {
KafkaType kafkaType = (KafkaType) dataType;
if (kafkaType == KafkaType.INT8) {
return SchemaBuilder.OPTIONAL_INT8_SCHEMA;
}
if (kafkaType == KafkaType.INT16) {
return SchemaBuilder.OPTIONAL_INT16_SCHEMA;
}
if (kafkaType == KafkaType.INT32) {
return SchemaBuilder.OPTIONAL_INT32_SCHEMA;
}
if (kafkaType == KafkaType.INT64) {
return SchemaBuilder.OPTIONAL_INT64_SCHEMA;
}
if (kafkaType == KafkaType.FLOAT32) {
return SchemaBuilder.OPTIONAL_FLOAT32_SCHEMA;
}
if (kafkaType == KafkaType.FLOAT64) {
return SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA;
}
if (kafkaType == KafkaType.BOOLEAN) {
return SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA;
}
if (kafkaType == KafkaType.STRING) {
return SchemaBuilder.OPTIONAL_STRING_SCHEMA;
}
if (kafkaType == KafkaType.BYTES) {
return SchemaBuilder.OPTIONAL_BYTES_SCHEMA;
}
throw new IllegalArgumentException(
String.format("data type %s not support", dataType.toString()));
}
private Object convertValue(Schema schema, Object data) {
if (data == JSONObject.NULL || data == null) {
return null;
}
Schema.Type type = schema.type();
if (type == Type.INT8) {
if (data instanceof Byte) {
return data;
}
// convert data to Byte
}
if (type == Type.INT16) {
if (data instanceof Short) {
return data;
} else {
return Short.valueOf(data.toString());
}
}
if (type == Type.INT32) {
if (data instanceof Integer) {
return data;
} else {
return Integer.valueOf(data.toString());
}
}
if (type == Type.INT64) {
if (data instanceof Long) {
return data;
} else {
return Long.valueOf(data.toString());
}
}
if (type == Type.FLOAT32) {
if (data instanceof Float) {
return data;
} else {
return Float.valueOf(data.toString());
}
}
if (type == Type.FLOAT64) {
if (data instanceof Double) {
return data;
} else {
return Double.valueOf(data.toString());
}
}
if (type == Type.BOOLEAN) {
if (data instanceof Boolean) {
return data;
} else {
return Boolean.valueOf(data.toString());
}
}
if (type == Type.STRING) {
return data.toString();
}
if (type == Type.BYTES) {
if (data instanceof ByteBuffer || data instanceof byte[]) {
return data;
} else {
return data.toString().getBytes();
}
}
// convert data to ByteBuffer or byte[]
if (type == Type.STRUCT || type == Type.MAP || type == Type.ARRAY) {
// Customized handling.
return data;
}
throw new IllegalArgumentException(
String.format("data type %s not support", schema.type().toString()));
}
}
最后更新于
这有帮助吗?