Skip to content

Commit b2c9119

Browse files
author
pdm
committed
start fiflow-io
1 parent e5d0274 commit b2c9119

File tree

12 files changed

+251
-1
lines changed

12 files changed

+251
-1
lines changed

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- fiflow-web 与前端对应的后台
88
- fiflow-api fiflow 对外提供的操作api
99
- fiflow-core flink 操作的封装
10+
- fiflow-io 输入 输出
1011
- fiflow-runtime 提交任务到flink local、standalone、yarn 以及与flink集群的交互
1112

1213
## 如何使用
@@ -27,9 +28,16 @@
2728
- 任务管理
2829
- 使用 数据源管理功能 取代 create table 操作
2930

31+
## fiflow-io
32+
- elasticsearch source sink doing
33+
- mysql binlog doing
34+
- hbase todo
35+
- ... todo
36+
3037
## Thanks
31-
- flink
38+
- flink
3239
- zeppelin
3340
- spring boot
41+
- flinkx
3442
- vue
3543
- element-ui
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.github.myetl.fiflow.core.io;
2+
3+
import org.apache.flink.api.common.io.RichInputFormat;
4+
5+
public abstract class BaseInputFormat extends RichInputFormat {
6+
}

fiflow-io/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# 输入 输出
2+
3+
## elasticsearch
4+
5+
## mysql binlog
6+
7+
## hbase
8+
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Elasticsearch7 source and sink
2+
3+
support elasticsearch version 7.x
4+
5+
## source
6+
7+
## sink
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>fiflow-io</artifactId>
7+
<groupId>com.github.myetl</groupId>
8+
<version>1.10-SNAPSHOT</version>
9+
</parent>
10+
11+
<modelVersion>4.0.0</modelVersion>
12+
<artifactId>fiflow-elasticsearch7</artifactId>
13+
14+
<dependencies>
15+
16+
<dependency>
17+
<groupId>org.elasticsearch.client</groupId>
18+
<artifactId>elasticsearch-rest-high-level-client</artifactId>
19+
<version>7.6.1</version>
20+
</dependency>
21+
22+
</dependencies>
23+
24+
</project>
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.github.myetl.fiflow.io.elasticsearch7;
2+
3+
public class EsUtil {
4+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# mysql binlog reader
2+
3+
使用 alibaba canal
4+
5+
## 配置 mysql 开启 binlog
6+
7+
- my.cnf 中配置如下
8+
```
9+
[mysqld]
10+
log-bin=mysql-bin # 开启 binlog
11+
binlog-format=ROW # 选择 ROW 模式
12+
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
13+
```
14+
15+
- 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
16+
```
17+
CREATE USER canal IDENTIFIED BY 'canal';
18+
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
19+
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
20+
FLUSH PRIVILEGES;
21+
```
22+
23+
## ddl

fiflow-io/fiflow-mysql-binlog/pom.xml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>fiflow-io</artifactId>
7+
<groupId>com.github.myetl</groupId>
8+
<version>1.10-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>fiflow-mysql-binlog</artifactId>
13+
14+
<dependencies>
15+
<dependency>
16+
<groupId>com.alibaba.otter</groupId>
17+
<artifactId>canal.parse</artifactId>
18+
<version>1.1.4</version>
19+
</dependency>
20+
</dependencies>
21+
22+
</project>
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.github.myetl.fiflow.io.mysqlbinlog;
2+
3+
import com.alibaba.otter.canal.common.alarm.LogAlarmHandler;
4+
import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter;
5+
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
6+
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
7+
import com.alibaba.otter.canal.parse.index.MemoryLogPositionManager;
8+
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
9+
import com.alibaba.otter.canal.protocol.CanalEntry;
10+
import com.alibaba.otter.canal.protocol.position.EntryPosition;
11+
import com.alibaba.otter.canal.sink.CanalEventSink;
12+
13+
import java.net.InetSocketAddress;
14+
import java.util.List;
15+
16+
/**
17+
* for test
18+
*/
19+
public class Main {
20+
21+
public static void main(String[] args) {
22+
MysqlEventParser controller = new MysqlEventParser();
23+
24+
controller.setSlaveId(11l);
25+
26+
controller.setMasterInfo(new AuthenticationInfo(new InetSocketAddress("127.0.0.1", 3306),
27+
"root", "root"));
28+
controller.setConnectionCharset("UTF-8");
29+
30+
controller.setDestination("haha");
31+
controller.setParallel(false);
32+
controller.setParallelBufferSize(1024);
33+
controller.setParallelThreadSize(1);
34+
controller.setIsGTIDMode(false);
35+
36+
37+
EntryPosition startPosition = new EntryPosition();
38+
39+
startPosition.setJournalName("mysql-bin.000148");
40+
startPosition.setPosition(10436856l);
41+
startPosition.setTimestamp(System.currentTimeMillis());
42+
43+
controller.setMasterPosition(startPosition);
44+
45+
CanalLogPositionManager pm = new MemoryLogPositionManager();
46+
pm.start();
47+
48+
49+
controller.setLogPositionManager(pm);
50+
controller.setEventFilter(new AviaterRegexFilter("flink.student,flink.stuout"));
51+
52+
controller.setAlarmHandler(new LogAlarmHandler());
53+
54+
CanalEventSink<List<CanalEntry.Entry>> sink = new Sink();
55+
controller.setEventSink(sink);
56+
57+
controller.setDefaultConnectionTimeoutInSeconds(60);
58+
controller.start();
59+
}
60+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.github.myetl.fiflow.io.mysqlbinlog;
2+
3+
import com.alibaba.otter.canal.protocol.CanalEntry;
4+
import com.alibaba.otter.canal.sink.AbstractCanalEventSink;
5+
import com.alibaba.otter.canal.sink.exception.CanalSinkException;
6+
7+
import java.net.InetSocketAddress;
8+
import java.util.List;
9+
10+
public class Sink extends AbstractCanalEventSink<List<CanalEntry.Entry>> {
11+
12+
13+
private static void printEntry(List<CanalEntry.Entry> entrys) {
14+
for (CanalEntry.Entry entry : entrys) {
15+
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
16+
continue;
17+
}
18+
CanalEntry.RowChange rowChage = null;
19+
try {
20+
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
21+
} catch (Exception e) {
22+
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
23+
}
24+
25+
CanalEntry.EventType eventType = rowChage.getEventType();
26+
System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(),
27+
entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
28+
29+
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
30+
if (eventType == CanalEntry.EventType.DELETE) {
31+
printColumn(rowData.getBeforeColumnsList());
32+
} else if (eventType == CanalEntry.EventType.INSERT) {
33+
printColumn(rowData.getAfterColumnsList());
34+
} else {
35+
System.out.println("-------&gt; before");
36+
printColumn(rowData.getBeforeColumnsList());
37+
System.out.println("-------&gt; after");
38+
printColumn(rowData.getAfterColumnsList());
39+
}
40+
}
41+
}
42+
}
43+
44+
private static void printColumn(List<CanalEntry.Column> columns) {
45+
for (CanalEntry.Column column : columns) {
46+
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
47+
}
48+
}
49+
50+
@Override
51+
public boolean sink(List<CanalEntry.Entry> entries, InetSocketAddress inetSocketAddress, String s) throws CanalSinkException, InterruptedException {
52+
53+
printEntry(entries);
54+
55+
return true;
56+
}
57+
}

fiflow-io/pom.xml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xmlns="http://maven.apache.org/POM/4.0.0"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>fiflow</artifactId>
7+
<groupId>com.github.myetl</groupId>
8+
<version>1.10-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
<artifactId>fiflow-io</artifactId>
12+
<packaging>pom</packaging>
13+
<version>1.10-SNAPSHOT</version>
14+
15+
<modules>
16+
<module>fiflow-elasticsearch7</module>
17+
<module>fiflow-mysql-binlog</module>
18+
</modules>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>com.github.myetl</groupId>
23+
<artifactId>fiflow-core</artifactId>
24+
<version>1.10-SNAPSHOT</version>
25+
<scope>provided</scope>
26+
</dependency>
27+
</dependencies>
28+
29+
30+
</project>

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
<module>fiflow-runtime</module>
1414
<module>fiflow-api</module>
1515
<module>fiflow-web</module>
16+
<module>fiflow-io</module>
1617
</modules>
1718

1819

0 commit comments

Comments
 (0)