import java.util.Map;
import com.datapipeline.clients.codeengine.CustomizedCodeEngineException;
import com.datapipeline.clients.connector.schema.base.SinkColumn;
import com.datapipeline.clients.connector.record.DpRecordMeta;
import com.datapipeline.clients.connector.schema.base.SinkSchema;
import com.datapipeline.internal.bean.node.column.DataType;
import com.datapipeline.internal.bean.node.column.kafka.KafkaType;
import com.datapipeline.clients.connector.schema.base.DpRecordKey;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.codehaus.jettison.json.JSONObject;
/**
* 详情请见产品手册:
*
* @see https://www.kancloud.cn/datapipeline-group/v3_7_0/1475482
*/
public class AvroCodeEngine {
private List<String> valueSyncColumnNames;
private List<String> keySyncColumnNames;
private Schema valueSchema;
private Schema keySchema;
public AvroCodeEngine() {}
public Struct getKeyStruct(DpRecordKey dpRecordKey,
SinkSchema sinkSchema,
JSONObject dataJson){
List<String> pkNames = dpRecordKey.getPrimaryKeyNames();
if (!Objects.equals(keySyncColumnNames, sinkSchema.getSyncColumnNames())) {
keySyncColumnNames = sinkSchema.getSyncColumnNames();
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name("key");
for (SinkColumn sinkColumn : sinkSchema.getSyncColumns()) {
if (pkNames.contains(sinkColumn.getName())) {
keySchemaBuilder.field(
sinkColumn.getName(),
convertDefinitionToSchema(sinkColumn.getDefinition().getType()));
}
}
keySchema = keySchemaBuilder.build();
}
Struct struct = new Struct(keySchema);
for (SinkColumn sinkColumn : sinkSchema.getSyncColumns()) {
String columnName = sinkColumn.getName();
if (pkNames.contains(columnName)) {
struct.put(
columnName,
convertValue(keySchema.field(columnName).schema(), dataJson.opt(columnName)));
}
}
return struct;
}
public Map<String, Object> process(Map<String, Object> record, DpRecordMeta meta) {
// 系统会向 dml 字段写入标识:insert、update、delete。
// 如果是定时读取模式,存量 dml 字段会标识为 insert,增量 dml 字段会标识为 update,
// 如果是实时读取模式,存量 dml 字段会标识为 insert,增量 dml 字段会标识为 update 和 delete。
Map<String,String> customParams = meta.getCustomParams();
if(customParams!=null){
record.put("enttyp",customParams.get("ENTTYP"));
}
record.put("op", meta.getType());
// 保存该脚本后,请在目的地表结构添加新字段:dml,并把字段类型改为字符串类型(例如:varchar)
return record;
}
public Struct getValueStruct(SinkSchema sinkSchema, JSONObject dataJson)
throws CustomizedCodeEngineException {
if (!Objects.equals(valueSyncColumnNames, sinkSchema.getSyncColumnNames())) {
valueSyncColumnNames = sinkSchema.getSyncColumnNames();
SchemaBuilder messageSchema = SchemaBuilder.struct().name("message");
for (SinkColumn sinkColumn : sinkSchema.getSyncColumns()) {
messageSchema.field(
sinkColumn.getName(), convertDefinitionToSchema(sinkColumn.getDefinition().getType()));
}
valueSchema =
SchemaBuilder.struct()
.name("DBMessage")
.field("op", Schema.OPTIONAL_STRING_SCHEMA)
.field("enttyp",Schema.OPTIONAL_STRING_SCHEMA)
.field("message", messageSchema.build())
.build();
}
Struct struct = new Struct(valueSchema);
Schema messageSchema = valueSchema.field("message").schema();
Struct messageStruct = new Struct(messageSchema);
for (SinkColumn sinkColumn : sinkSchema.getSyncColumns()) {
String columnName = sinkColumn.getName();
messageStruct.put(
columnName,
convertValue(messageSchema.field(columnName).schema(), dataJson.opt(columnName)));
}
struct.put("op", dataJson.optString("op"));
struct.put("enttyp",dataJson.optString("enttyp"));
struct.put("message", messageStruct);
return struct;
}
private Schema convertDefinitionToSchema(DataType dataType) {
KafkaType kafkaType = (KafkaType) dataType;
if (kafkaType == KafkaType.INT8) {
return SchemaBuilder.OPTIONAL_INT8_SCHEMA;
}
if (kafkaType == KafkaType.INT16) {
return SchemaBuilder.OPTIONAL_INT16_SCHEMA;
}
if (kafkaType == KafkaType.INT32) {
return SchemaBuilder.OPTIONAL_INT32_SCHEMA;
}
if (kafkaType == KafkaType.INT64) {
return SchemaBuilder.OPTIONAL_INT64_SCHEMA;
}
if (kafkaType == KafkaType.FLOAT32) {
return SchemaBuilder.OPTIONAL_FLOAT32_SCHEMA;
}
if (kafkaType == KafkaType.FLOAT64) {
return SchemaBuilder.OPTIONAL_FLOAT64_SCHEMA;
}
if (kafkaType == KafkaType.BOOLEAN) {
return SchemaBuilder.OPTIONAL_BOOLEAN_SCHEMA;
}
if (kafkaType == KafkaType.STRING) {
return SchemaBuilder.OPTIONAL_STRING_SCHEMA;
}
if (kafkaType == KafkaType.BYTES) {
return SchemaBuilder.OPTIONAL_BYTES_SCHEMA;
}
throw new IllegalArgumentException(
String.format("data type %s not support", dataType.toString()));
}
private Object convertValue(Schema schema, Object data) {
if (data == JSONObject.NULL || data == null) {
return null;
}
Schema.Type type = schema.type();
if (type == Type.INT8) {
if (data instanceof Byte) {
return data;
}
// convert data to Byte
}
if (type == Type.INT16) {
if (data instanceof Short) {
return data;
} else {
return Short.valueOf(data.toString());
}
}
if (type == Type.INT32) {
if (data instanceof Integer) {
return data;
} else {
return Integer.valueOf(data.toString());
}
}
if (type == Type.INT64) {
if (data instanceof Long) {
return data;
} else {
return Long.valueOf(data.toString());
}
}
if (type == Type.FLOAT32) {
if (data instanceof Float) {
return data;
} else {
return Float.valueOf(data.toString());
}
}
if (type == Type.FLOAT64) {
if (data instanceof Double) {
return data;
} else {
return Double.valueOf(data.toString());
}
}
if (type == Type.BOOLEAN) {
if (data instanceof Boolean) {
return data;
} else {
return Boolean.valueOf(data.toString());
}
}
if (type == Type.STRING) {
return data.toString();
}
if (type == Type.BYTES) {
if (data instanceof ByteBuffer || data instanceof byte[]) {
return data;
} else {
return data.toString().getBytes();
}
}
// convert data to ByteBuffer or byte[]
if (type == Type.STRUCT || type == Type.MAP || type == Type.ARRAY) {
// Customized handling.
return data;
}
throw new IllegalArgumentException(
String.format("data type %s not support", schema.type().toString()));
}
}