Skip to content

Commit

Permalink
[ISSUES #4939]add canal connector
Browse files Browse the repository at this point in the history
  • Loading branch information
sodaRyCN authored May 29, 2024
1 parent cb1b7b8 commit b4d3b2a
Show file tree
Hide file tree
Showing 38 changed files with 4,627 additions and 0 deletions.
34 changes: 34 additions & 0 deletions eventmesh-connectors/eventmesh-connector-canal/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

List canal = [
"com.alibaba.otter:canal.instance.manager:$canal_version",
"com.alibaba.otter:canal.parse:$canal_version",
"com.alibaba.otter:canal.server:$canal_version"
]

dependencies {
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
implementation project(":eventmesh-common")
implementation canal
implementation "com.alibaba:druid:1.2.6"
// implementation "org.apache.ddlutils:ddlutils:1.0"
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation "org.mockito:mockito-core"
testImplementation "org.mockito:mockito-junit-jupiter"
}
19 changes: 19 additions & 0 deletions eventmesh-connectors/eventmesh-connector-canal/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
canal_version=1.1.7
pluginType=connector
pluginName=MySQL
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2010-2101 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.canal;

import org.apache.commons.beanutils.ConversionException;
import org.apache.commons.beanutils.Converter;
import org.apache.commons.beanutils.converters.ArrayConverter;
import org.apache.commons.beanutils.converters.ByteConverter;


public class ByteArrayConverter implements Converter {

public static final Converter SQL_BYTES = new ByteArrayConverter(null);
private static final Converter converter = new ArrayConverter(byte[].class, new ByteConverter());

protected final Object defaultValue;
protected final boolean useDefault;

public ByteArrayConverter() {
this.defaultValue = null;
this.useDefault = false;
}

public ByteArrayConverter(Object defaultValue) {
this.defaultValue = defaultValue;
this.useDefault = true;
}

public Object convert(Class type, Object value) {
if (value == null) {
if (useDefault) {
return (defaultValue);
} else {
throw new ConversionException("No value specified");
}
}

if (value instanceof byte[]) {
return (value);
}

// BLOB类型,canal直接存储为String("ISO-8859-1")
if (value instanceof String) {
try {
return ((String) value).getBytes("ISO-8859-1");
} catch (Exception e) {
throw new ConversionException(e);
}
}

return converter.convert(type, value); // byteConvertor进行转化
}
}
Loading

0 comments on commit b4d3b2a

Please sign in to comment.