清洗脚本配置样例

Best Practice of Using ETL Script

为了方便用户使用清洗脚本,我们提供了丰富的清洗脚本使用样例。

一、DML字段标识脚本

脚本描述:源表数据发生变化时,在目的地表中增加相应的 DML 标识字段,包括 insert、update、delete。

import com.datapipeline.clients.connector.record.DpRecordMeta;
import java.util.Map;
public  class AddDMLField {
    public  static Map process(Map record, DpRecordMeta meta) {
    // 系统会向 dml 字段写入标识:insert、update、delete。
    // 如果是定时读取模式,所有 dml 字段会表示为 insert,只有通过实时模式读取数据时才会识别 update 和 delete。
    record.put("dml", meta.getType());
    // 保存该脚本后,请在目的地表结构添加新字段:dml,并把字段类型改为字符串类型(例如:varchar)
    return record;
    }
}

二、读取时间脚本

脚本描述:根据数据读取时间,在目的地表中增加相应的时间字段。

import com.datapipeline.clients.connector.record.DpRecordMeta;
import java.util.Map;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.Instant;
import java.time.format.DateTimeFormatter; 
public  class AddCollectTimeField {
    private  static DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    public Map process(Map record, DpRecordMeta meta) {
        // 系统会向 collect_time 字段写入读取该数据的时间,格式为:yyyy-MM-dd HH:mm:ss。
        record.put("collect_time", Instant.ofEpochMilli(meta.getSourceRecordTimestamp()).atZone(ZoneId.of("UTC")).format(DATE_TIME_FORMATTER));
        // 保存脚本后,在目的地表结构中添加字段:collect_time,并把字段类型改为时间类型(或字符串类型)。
        return record;
    }
}

三、写入时间脚本

脚本描述:根据数据写入时间,在目的地表中增加相应的时间字段。

import java.util.Map;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
public  class AddUpdateTimeField {
    private  static DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    public Map process(Map record) {
        // 系统会向 update_time 字段写入写入目的地的时间,格式为:yyyy-MM-dd HH:mm:ss。
        record.put("update_time", LocalDateTime.now(ZoneId.of("Asia/Shanghai")).format(DATE_TIME_FORMATTER));
        // 保存脚本后,在目的地表结构中添加字段:update_time,并把字段类型改为时间类型(或字符串类型)。
        return record;
    }
}

四、日期类型转换脚本

脚本描述:将 Timestamp 类型数据转换为日期类型

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
public  class TimestampProcess {
    private  static  final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    private  static  final DateTimeFormatter TIMESTAMP_FORMATTER =
        DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    public Map process(Map record) {
    // timestamp_type 类型为字符串,格式为:2019-07-24 17:06:54.000
    final String timestampStr = (String) record.get("timestamp_type");
    if (timestampStr != null) {
        try {
            // 将字符串转换为日期对象
            final LocalDateTime localDateTime = LocalDateTime.parse(timestampStr, TIMESTAMP_FORMATTER);
            // 将时间对象转换为 DATE_FORMATTER 格式的字符串,转化之后的字符串为:2019-07-24
            record.put("formatedTime", DATE_FORMATTER.format(localDateTime));
        } catch (Exception e) {
        e.printStackTrace();
        }
    }
    return record;
    }
}

五、Timestamp 转换脚本

脚本描述:将日期类型转换为Timestamp 类型数据。

import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
public class Date2TimeStampSample {
  public Map<String, Object> process(Map<String, Object> record)  throws Exception{
    // date = "20180101 12:0:22.123"
    String date = record.get("date");
    long timeStamp = ZonedDateTime.parse(date, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("Asia/Shanghai"))).toInstant().toEpochMilli();
    record.put("ts", timeStamp);
    return record;
  }
}

六、JSON解析脚本

脚本描述:将 JSON 数据解析,并指定到目标表表结构。

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class Test_api_url_nameTransformEngine {
  public List<Map<String, Object>> process(Map<String, Object> record) {
    List<Map<String, Object>> records = new ArrayList<>();
    if (!Objects.isNull(record)) {
      String jsonStr = (String) record.get("content");
      JSONObject jsonNode = JSON.parseObject(jsonStr);
      JSONArray items = jsonNode.getJSONObject("reponseData").getJSONArray("data");
      for (Object iterm : items) {
        JSONObject eachIterm = (JSONObject) iterm;
        String title = String.valueOf(eachIterm.get("title"));
        if (title != null && title.length() > 100) {
          title = title.substring(0, 100);
        }
        Map<String, Object> eachRowData = new HashMap<>();
        eachRowData.put("_id", String.valueOf(eachIterm.get("id")));
        eachRowData.put("tab", String.valueOf(eachIterm.get("tab")));
        eachRowData.put("title", title);
        records.add(eachRowData);
      }
    }
    return records;
  }
}

七、数据脱敏

场景描述:用户想对表格中的信息进行脱敏,比如手机号或者身份证号。

例:身份证号脱敏,保留前四位和后四位,手机号码脱敏,保留前三位和后四位。

具体代码如下:

import java.util.Map;
public class Sid_2382_05b0f75d2382f96a23824d7023829c612382ef7ac208785d {
  public Map process(Map record) {
    record.put("id",((String)record.get("id")).replaceAll("(\\d{4})\\d{10}(\\d{4})","$1**********$2"));
    record.put("phone",((String)record.get("phone")).replaceAll("(\\d{3})\\d{4}(\\d{4})","$1****$2"));
    return record;
  }
}

八、枚举类型转换

场景描述:用户想对表格中的类型进行转换。

例:把原表中表示性别的‘0’和‘1‘,到目的地表中,转换为对应的‘男’和‘女’。

具体代码如下:

import java.util.Map;
public class Sid_2383_a8984a582383944223834f332383b7e52383aa115c629f93 {
  public Map process(Map record) {
    if (record.get("sex").equals("0"))
    {record.put("sex","男");}
    else if(record.get("sex").equals("1"))
    {record.put("sex","女");}
    else
    {record.put("sex","");}
    return record;
  }
}

九、字段求和

场景描述:用户想对表格中的几个字段的内的数据进行求和。

例:把原表中的两个字段内的薪水,到目的地表中,生成一个新的字段,其值为前两个字段的和。

具体代码如下:

import java.util.Map;
public class Sid_2384_f049d91a2384daea23844a3a23848fd923847cc3bc23dc6e {
  public Map<String, Object>process(Map<String, Object>record) {
    record.put("sum",(Double) record.get("salary1")+(Double) record.get("salary2"));
    return record;
  }
}

十、时间类型格式统一

场景描述:用户想对表格中字段的日期格式统一。(2019115、20190115、2019.01.15)转换成 YYYY-MM-DD(2019-01-15)。

具体代码如下:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class ConventDateSample {
  public Map<String, Object> process(Map<String, Object> record)  throws Exception{
    try{
        SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
        SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd");
        Date datetime=df1.parse(record.get("datetime").toString());
        String newDatetime = df2.format(datetime);
        record.put("datetime",newDatetime);
    }catch(Exception e){
    }  
    return record;
  }
}

十一、库存日期结合

场景描述:用户想对表格中日期和时间,结合到一个字段内。

具体代码如下:

import java.util.Map;
import java.util.Date;
import java.text.ParseException;
import java.text.SimpleDateFormat;
public class Sid_2516_8ffcc7ca2516201f25164ee7251690f3251641b41d3bbd5c {
  public Map<String, Object>process(Map<String, Object>record) {
    record.put("datetime2", record.get("date")+" "+ record.get("time"));
    return record;
  }
}

十二、日期间隔计算

场景描述:用户想对表格中两个日期之间相差的天数,进行计算,再写入到一个字段内。

具体代码如下:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class Sid_2519_5017952225194a2425194ff5251980f3251985af080e3b28 {
  public Map<String, Object>process(Map<String, Object> record)  throws Exception{
    SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
    SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    Date start=df2.parse(record.get("startTime").toString());
    Date end=df2.parse(record.get("endTine").toString());
    try{
    int days= (int) ((df.parse(df.format(end)).getTime() -df.parse(df.format(start)).getTime()) / (24 * 60 * 60 * 1000));
    record.put("days",days);
}catch(Exception e){
}  
    return record;
  }
}

十三、非空字段判断

场景描述:判断一个字段是否为空。

import java.util.Map;
import java.util.Objects;
import com.datapipeline.clients.utils.CodeEngineUtils;
public  class Example13 {
  public Map process(Map record)
    final Object orderno = record.get("orderno");
    boolean ordernoIsNull = CodeEngineUtils.isNull(orderno);
}

十四、指定数据进错误队列

场景描述:指定某一些数据进入错误队列。

import java.util.Map;
import java.util.Objects;
import java.lang.String;
import java.lang.Throwable;
import com.datapipeline.clients.codeengine.CustomizedCodeEngineException;
public class Example14 {
 public Map process(Map record) throws CustomizedCodeEngineException {
 if(Integer.parseInt(record.get("age").toString()) <= 20){
 throw new CustomizedCodeEngineException(new Exception("Illegal data!"));
 }
 return record;
 }
}

十五、时区转换

场景描述:将上海时间转化为标准时间。

import java.util.Map;
import java.time.*;
import java.time.format.DateTimeFormatter;
public class localTime2standardTime {
   public Map<String, Object> process(Map<String, Object> record) {
      String crt = (String)record.get("create_time");
      try {
        long ncrt = ZonedDateTime.parse(crt,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.of("Asia/Shanghai"))).toInstant().toEpochMilli();
        record.put("timestamp",ncrt);
      }
      catch (Exception e) {
           e.printStackTrace();
      }
   return record;
   }
}

十六、字符集编码转换

场景描述:当数据源的数据为乱码时,可通过高级清洗转换字符集编码,处理乱码数据。

package com.dp.exltcsv;
import java.io.UnsupportedEncodingException;
import java.util.Map;
public class utf2gbk {
    public Map<String, Object> process(Map<String, Object> record) throws UnsupportedEncodingException {
        // 针对单个字段的格式
//        String key = (String)record.get("key");
//        record.put(key,(Object)new String(key.getBytes("utf-8"),"gbk"));
//        return record;
        // 针对所有字段
        for (String key : record.keySet()) {
            String value = (String)record.get(key);
            record.put(key,(Object)new String( value.getBytes("utf-8"), "gbk"));
        }
        return record;
    }
}

十七、Kafka目的地Avro格式定义

场景描述:当数据目的地节点为Kafka,且Topic序列化器为Avro时,可以通过高级清洗代码定义Avro数据结构,并将Schema信息写入Schema Registry。

import java.util.Map;
import com.datapipeline.clients.codeengine.CustomizedCodeEngineException;
import com.datapipeline.clients.connector.schema.base.SinkColumn;
import com.datapipeline.clients.connector.record.DpRecordMeta;
import com.datapipeline.clients.connector.schema.base.SinkSchema;
import com.datapipeline.internal.bean.node.column.DataType;
import com.datapipeline.internal.bean.node.column.kafka.KafkaType;
import com.datapipeline.clients.connector.schema.base.DpRecordKey;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.codehaus.jettison.json.JSONObject;

/**
 * 详情请见产品手册:
 *
 * @see https://www.kancloud.cn/datapipeline-group/v3_7_0/1475482
 */
public class AvroCodeEngine {

  private List<String> valueSyncColumnNames;
  private List<String> keySyncColumnNames;

  private Schema valueSchema;
  private Schema keySchema;

  public AvroCodeEngine() {}

public Struct getKeyStruct(DpRecordKey dpRecordKey,
      SinkSchema sinkSchema,
      JSONObject dataJson){
    List<String> pkNames = dpRecordKey.getPrimaryKeyNames();


    if (!Objects.equals(keySyncColumnNames, sinkSchema.getSyncColumnNames())) {
      keySyncColumnNames = sinkSchema.getSyncColumnNames();

      SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name("key");
      for (SinkColumn sinkColumn : sinkSchema.getSyncColumns()) {
        if (pkNames.contains(sinkColumn.getName())) {
          keySchemaBuilder.field(
              sinkColumn.getName(),
              convertDefinitionToSchema(sinkColumn.getDefinition().getType()));
          }
      }

      keySchema = keySchemaBuilder.build();
    }
    Struct struct = new Struct(keySchema);

    for (SinkColumn sinkColumn : sinkSchema.getSyncColumns()) {
      String columnName = sinkColumn.getName();
      if (pkNames.contains(columnName)) {

        struct.put(
            columnName,
            convertValue(keySchema.field(columnName).schema(), dataJson.opt(columnName)));
      }
    }
    return struct;

  }

  public Map<String, Object> process(Map<String, Object> record, DpRecordMeta meta) {
    // 系统会向 dml 字段写入标识:insert、update、delete。
    // 如果是定时读取模式,存量 dml 字段会标识为 insert,增量 dml 字段会标识为 update,
    // 如果是实时读取模式,存量 dml 字段会标识为 insert,增量 dml 字段会标识为 update 和 delete。
    Map<String,String> customParams = meta.getCustomParams();
    if(customParams!=null){
      record.put("enttyp",customParams.get("ENTTYP"));
    }
    record.put("op", meta.getType());
    
    // 保存该脚本后,请在目的地表结构添加新字段:dml,并把字段类型改为字符串类型(例如:varchar)
    return record;
  }



  public Struct getValueStruct(SinkSchema sinkSchema, JSONObject dataJson)
      throws CustomizedCodeEngineException {
    if (!Objects.equals(valueSyncColumnNames, sinkSchema.getSyncColumnNames())) {
      valueSyncColumnNames = sinkSchema.getSyncColumnNames();

      SchemaBuilder messageSchema = SchemaBuilder.struct().name("message");
      for (SinkColumn sinkColumn : sinkSchema.getSyncColumns()) {
        messageSchema.field(
            sinkColumn.getName(), convertDefinitionToSchema(sinkColumn.getDefinition().getType()));
      }

      valueSchema =
          SchemaBuilder.struct()
              .name("DBMessage")
              .field("op", Schema.OPTIONAL_STRING_SCHEMA)
              .field("enttyp",Schema.OPTIONAL_STRING_SCHEMA)
              .field("message", messageSchema.build())
              .build();
    }
    Struct struct = new Struct(valueSchema);
    Schema messageSchema = valueSchema.field("message").schema();
    Struct messageStruct = new Struct(messageSchema);

    for (SinkColumn sinkColumn : sinkSchema.getSyncColumns()) {
      String columnName = sinkColumn.getName();
      messageStruct.put(
          columnName,
          convertValue(messageSchema.field(columnName).schema(), dataJson.opt(columnName)));
    }

    struct.put("op", dataJson.optString("op"));
    struct.put("enttyp",dataJson.optString("enttyp"));
    struct.put("message", messageStruct);
    return struct;
  }

  private Schema convertDefinitionToSchema(DataType dataType) {
    KafkaType kafkaType = (KafkaType) dataType;
    if (kafkaType == KafkaType.INT8) {
      return SchemaBuilder.OPTIONAL_INT8_SCHEMA;
    }
    if (kafkaType == KafkaType.INT16) {
      return SchemaBuilder.OPTIONAL_INT16_SCHEMA;
    }
    if (kafkaType == KafkaType.INT32) {
      return SchemaBuilder.OPTIONAL_INT32_SCHEMA;
    }
    if (kafkaType == KafkaType.INT64) {
      return SchemaBuilder.OPTIONAL_INT64_SCHEMA;
    }
    if (kafkaType == KafkaType.FLOAT32) {
      return SchemaBuilder.OPTIONAL_FLOAT32_SCHEMA;
    }
    if (kafkaType == KafkaType.FLOAT64) {
      return SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA;
    }
    if (kafkaType == KafkaType.BOOLEAN) {
      return SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA;
    }
    if (kafkaType == KafkaType.STRING) {
      return SchemaBuilder.OPTIONAL_STRING_SCHEMA;
    }
    if (kafkaType == KafkaType.BYTES) {
      return SchemaBuilder.OPTIONAL_BYTES_SCHEMA;
    } 
    throw new IllegalArgumentException(
          String.format("data type %s not support", dataType.toString()));
    
  }

  private Object convertValue(Schema schema, Object data) {
    if (data == JSONObject.NULL || data == null) {
      return null;
    }
    Schema.Type type = schema.type();
    if (type == Type.INT8) {
      if (data instanceof Byte) {
        return data;
      }
      // convert data to Byte
    }
    if (type == Type.INT16) {
      if (data instanceof Short) {
        return data;
      } else {
        return Short.valueOf(data.toString());
      }
    }
    if (type == Type.INT32) {
      if (data instanceof Integer) {
        return data;
      } else {
        return Integer.valueOf(data.toString());
      }
    }
    if (type == Type.INT64) {
      if (data instanceof Long) {
        return data;
      } else {
        return Long.valueOf(data.toString());
      }
    }
    if (type == Type.FLOAT32) {
      if (data instanceof Float) {
        return data;
      } else {
        return Float.valueOf(data.toString());
      }
    }
    if (type == Type.FLOAT64) {
      if (data instanceof Double) {
        return data;
      } else {
        return Double.valueOf(data.toString());
      }
    }
    if (type == Type.BOOLEAN) {
      if (data instanceof Boolean) {
        return data;
      } else {
        return Boolean.valueOf(data.toString());
      }
    }
    if (type == Type.STRING) {
      return data.toString();
    }
    if (type == Type.BYTES) {
      if (data instanceof ByteBuffer || data instanceof byte[]) {
        return data;
      } else {
        return data.toString().getBytes();
      }
    }
    // convert data to ByteBuffer or byte[]
    if (type == Type.STRUCT || type == Type.MAP || type == Type.ARRAY) {
      // Customized handling.
      return data;
    }
    throw new IllegalArgumentException(
        String.format("data type %s not support", schema.type().toString()));
  }
}

最后更新于

这有帮助吗?