Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

内部测试 #1

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 1 addition & 143 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,143 +1 @@
后期实现功能:
支持导出成excel,csv,sql文件,再加入es导数据到数据库的模块(目前ES官方的logstash-jdbc只支持数据库导数据到ES),后续版本可能开发得比较慢,因为年末很多总结要写。
年末开发慢
# About
该工具实现从ES中导出数据,并且可以对导出的数据格式和数据文件做部分自定义(后面支持更多的自定义),该工具主要使用ES中srcoll接口多线程导出数据.

# Design
![Base](https://github.com/760515805/es_data_export/blob/master/docs/design.png)
    项目通过两个线程池实现功能,一个线程池主要从ElasticSearch获取数据,获取方式为es接口的Srcoll方式,多线程的话则通过slice切割数据.另一个线程池主要拿来写文件,使用BlockingQueue把数据缓冲到线程队列中,通过一条线程单线程写文件.

# Version
版本号说明:大版本.新增功能.小补丁

## V1.2.4

&nbsp;&nbsp;1.新增线程池监控,在数据导出结束后正确停止程序。<br>
&nbsp;&nbsp;2.新增配置启动前验证配置是否正确,设置配置默认值。<br>
&nbsp;&nbsp;3.优化异常日志输出,更好排查问题。

## V1.2.3

&nbsp;&nbsp;1.优化写文件操作,使用BlockingQueue队列缓存。<br>
&nbsp;&nbsp;2.新增支持文件写到一定大小后进行文件切割。<br>
&nbsp;&nbsp;3.新增支持SSL加密获取数据。

## V1.2.2

&nbsp;&nbsp;1.重构代码,取消自己封装的HTTP工具,使用官方RestClient工具。<br>
&nbsp;&nbsp;2.新增支持多线程拉取ES数据。

## V1.0.1

&nbsp;&nbsp;1.实现单线程导出数据。

# Supported
| Elasticsearch version | support |
| -------- | -----: |
| >= 6.0.0 | yes |
| >= 5.0.0 | not test|
| >= 2.0.0 | not test |
| <= 1 | not test |

# Running
```
$git clone git://github.com/760515805/es_data_export.git
$cd es_data_export
```
如果已经安装了ant环境和maven环境则可以使用以下操作
```
$ant
$cd build
$vim global.properties
$./run.sh
```
如果只安装了maven环境则如下操作
```
$mvn clean package
$cp global.properties run.sh stop.sh logback.xml target/
$cd target
$vim global.properties
$./run.sh
```
切记修改global.properties文件

# Development
## 1.运行环境
- IDE:IntelliJ IDEA或者Eclipse
- 项目构建工具:Maven

## 2.初始化项目
- 打开IntelliJ IDEA,将项目导入
- 修改global.properties文件配置
- 运行App.java执行

# 配置文件名词解释
## index
数据索引
## type
索引type,无则可留空,ES7.0以后删除
## query
查询条件DSL,必须为ES的查询语句,可留空,默认:查询全部,条数1000
## includes
取哪些字段的数据,逗号隔开,如果全部取则设为空
## threadSize
获取数据线程数据,最大不超过索引的shards数量和CPU数量,默认为1
## esserver
ES集群IP地址,逗号隔开,如:192.169.2.98:9200,192.169.2.156:9200,192.169.2.188:9200
## esusername
如有帐号密码则填写,如果无则留空
## espassword
如有帐号密码则填写,如果无则留空
## isLineFeed
导出数据写入文件每条数据是否需要换行,默认:true
## dataLayout
输出源数据形式,目前支持json、txt,下个版本支持sql、excel,如果为txt字段间是用逗号隔开,默认:json
## filePath
数据输出文件路径,必填字段
## fileName
输出的文件名,无则取默认:index
## fileSize
每个文件多少条数据分割,需要则设置该项,该值应该比query的size大,如果设置该值则一个文件数据条数到达该数时则会分割,会有一点误差,分割文件名为fileName+文件数,单位:条
## customFieldName
自定义字段名,将库里该字段取出来后换为该字段名,原字段名:替换后的字段名,多个逗号隔开,如phone:telphone
## fieldSplit
字段以什么分割,不设置则默认英文逗号隔开
## fieldSort
字段输出顺序(必设),必须和索引表字段名一样,逗号隔开
## needFieldName
输出为txt的时候需要字段名字,默认:false,需要的时候以此形式输出类似:fieldName1=fieldValue1,fieldName2=fieldValue2
## SSL_type
SSL类型
## SSL_keyStorePath
密钥地址,文件地址
## SSL_keyStorePass
密钥密码
# 线程池设置
关于threadSize设置设置为多少合适,这里给出的权重是
-CPU核数>Shards>配置设置
意思是配置的设置不能大于CPU核数也不能大于索引的shards数量。
比如我是8核的机器,shards为15,配置设置20,最后取的线程数是8
如果我是8核的机器,shards为15,配置设置7,最后取的是 7
# 导出例子
## 1、导出为json格式
导出为json格式只需要设置以下,以下根据自己的设置进行设置
```
isLineFeed=true
dataLayout=json
filePath=F:\\pb_sa_phone\\test
fileName=pb_sa_phone
fileSize=1000
customFieldName=
```
## 2、导出为txt格式
以下是dataLayout=txt,为json时以下配置都无效的时候的自定义设置
```
fieldSplit=,
fieldSort=phone
needFieldName=false
```
# 联系作者

## QQ:760515805
## wx:chj-95
内部开发版本,稳定版请使用master分支
2 changes: 1 addition & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<target name="copyAll" depends="mvn_package">
<copy todir="${routine}" file="${target}/es_data_export.jar"></copy>
<copy todir="${routine}" file="global.properties"></copy>
<copy todir="${routine}" file="export.properties"></copy>
<copy todir="${routine}" file="logback.xml"></copy>
<copy todir="${routine}" file="run.sh"></copy>
<copy todir="${routine}" file="stop.sh"></copy>
Expand Down
Binary file removed docs/V1.0/ES数据导出工具-升级指导.docx
Binary file not shown.
Binary file removed docs/V1.0/ES数据导出工具-改动说明.docx
Binary file not shown.
Binary file removed docs/V1.0/ES数据导出工具-验证方法.docx
Binary file not shown.
Binary file removed docs/V1.2/ES数据导出工具-使用文档.docx
Binary file not shown.
Binary file removed docs/V1.2/ES数据导出工具-升级指导.docx
Binary file not shown.
Binary file removed docs/V1.2/ES数据导出工具-改动说明.docx
Binary file not shown.
Binary file removed docs/V1.2/ES数据导出工具-验证方法.docx
Binary file not shown.
Binary file removed docs/design.png
Binary file not shown.
100 changes: 100 additions & 0 deletions export.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
####################################常规设置######################################################
#获取数据线程数据,最大不超过索引的shards数量和CPU数量,默认为1
common.thread_size=10
#################################导出数据ES集群地址###################################################
elasticsearch.hosts=192.169.2.98:9200,192.169.2.156:9200,192.169.2.188:9200
#数据索引
elasticsearch.index=pb_sa_phone
#索引type,无则可留空
elasticsearch.document_type=pb_sa_phone
#查询条件,默认:{"size":10000}
elasticsearch.query={"size":10000,"query":{"bool":{"filter":[{"term":{"status":{"value":3,"boost":2.0}}},{"range":{"phone":{"from":0,"to":null,"include_lower":false,"include_upper":true,"boost":1.0}}},{"bool":{"should":[{"range":{"smsupdtm":{"from":null,"to":"2018-12-09 23:59:59","include_lower":true,"include_upper":false,"boost":3.0}}},{"range":{"rmsupdtm":{"from":null,"to":"2018-12-09 23:59:59","include_lower":true,"include_upper":false,"boost":3.0}}}],"adjust_pure_negative":true,"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}}}
#取哪些字段的数据,逗号隔开,如果全部取则设为空,也可以自行写在elasticsearch.query的查询条件里面
elasticsearch.includes=phone
#####如有帐号密码则填写#####
#elasticsearch.username=
#elasticsearch.password=

#####SSL,如有需要就设置,不需要忽略即可#####
#elasticsearch.ssl_type=
#elasticsearch.ssl_keystorepath=
#elasticsearch.ssl_keystorepass=

############################################导出文件的设置############################################
##是否启用,true:启用该功能,false:禁用该功能
file.enabled=true
#输出源数据形式,目前支持json、txt,sql,如果为txt字段间是用逗号隔开,默认:json
file.datalayout=txt

#是否需要换行,默认:true
file.linefeed=true
#数据输出文件路径,如果需要自定义路径地址可以使用{},目前支持:day,month,years
file.filepath=F:\\pb_sa_phone
####文件名,无则取默认ES的index名,不需要写后缀
file.filename=pb_sa_phone

#####是否需要分割文件,true:需要文件分割,false:禁用文件分割保存,默认:false##########
file.need_split_file=true
#need_split_file设为true该值后起效,以什么方式来分割,目前可选的有文件大小(disk)与数据条数(amount),默认:disk
file.split_method=amount
#当split_methods=disk的时候:每个文件多大进行分割,需要则设置该项,实际是有误差的,单位:KB,默认:10240
#当split_methods=amount的时候:,每个文件多少条数后进行分割,需要则设置该项,实际是有误差的,单位:条,默认:1000000
file.max_length_file=1000000

#####以下是dataLayout=txt的自定义设置######
#字段以什么分割,不设置则默认英文逗号隔开
#file.field_split=,
#字段输出顺序(datalayout=txt的时候设置起效,防止数据混乱),必须和索引表字段名一样,逗号隔开
#file.field_sort=phone
#输出为txt时候需要字段名字,默认:false,需要的时候以此形式输出类似:fieldName1=fieldValue1,fieldName2=fieldValue2
#file.need_field_name=false

#####以下是datalayout=sql自定义设置######
#file.sql_format=INSERT INTO test (phone,msgcode,spnumber) VALUES (#param{phone},#param{msgcode},#param{spnumber});

#自定义字段名,将库里该字段取出来后换为该字段名,原字段名:替换后的字段名,多个逗号隔开,如phone:telphone
#file.custom_field_name=

#######################################ES转移数据到DB########################################################
##是否启用,true:启用该功能,false:禁用该功能
db.enabled=false
##驱动jar包地址
db.jdbc_driver_library=lib/mysql-connector-java-5.1.47.jar
##数据库连接
db.jdbc_connection_string=jdbc:mysql://192.169.0.23:3306/dblog?useUnicode=true&characterEncoding=utf8
##驱动程序
db.jdbc_driver_class=com.mysql.jdbc.Driver
##数据库帐号
db.jdbc_user=root
##数据库密码
db.jdbc_password=123456
##插入数据模版,其中#param{ES字段}来取ES的值
db.jdbc_template=INSERT INTO test111 (name) VALUES (#param{simuid})
##单批次最大插入多少,默认10000
#db.jdbc_size=10000
##同时写DB线程数,默认1
db.jdbc_write_thread_size=1

#######################################ES转移数据到Kafka########################################################
##是否启用,true:启用该功能,false:禁用该功能
kafka.enabled=false
##另一个INDEX的集群ip,如果是相同的可留空,IP逗号隔开
kafka.hosts=192.169.0.61:9092
#数据索引
kafka.topic=pb_sa_chj_04
#从ES查询出来的数据List分批次发送到kafka,每批次发送多少条,默认1000
#kafka.send_size=1000
#每次写kafka是否启用延迟,防止过快写入造成kafka压力大,但延迟写入会造成导出数据慢,默认:0,单位:秒
#kafka.delay=0
#####默认写入jsonarray格式,以下这里可设置每条json文件的数据格式,不设置则取全部###
##每条数据新增的内容,如果和ES中查询出来的键重复了会直接覆盖,如果需要取当前时间可"#{now,yyyy-MM-dd HH:mm:ss}"这样取即可,如果只取时间戳则"#{now}",时间戳只到秒,别忘了用""包起来,为空则不设置
#kafka.add_value={"dttype":1,"updtm":"#{now,yyyy-MM-dd HH:mm:ss}"}
##替换ES中查询出来的数据的键名,旧键名:新键名,旧键名:新键名,为空则不设置
#kafka.replace_key=
##写kafka的线程,默认1
#kafka.write_thread_size=1
#######################################定时器########################################################
##请设置好以上设置,定时器将定时启动以上的导出线程池
##是否启用,true:启用该功能,false:禁用该功能
quartz.enabled=false
quartz.schedule=0 0 0 * * ?
66 changes: 0 additions & 66 deletions global.properties

This file was deleted.

Binary file added lib/mysql-connector-java-5.1.47.jar
Binary file not shown.
Binary file added lib/postgresql-42.2.5.jar
Binary file not shown.
Binary file added lib/sqljdbc4.jar
Binary file not shown.
6 changes: 5 additions & 1 deletion logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@
</Pattern>
</layout>
</appender>

<!-- 关闭kafka日志打印 -->
<logger name="org.apache.kafka" level="WARN"/>
<logger name="o.a.k.c.consumer" level="WARN"/>
<logger name="org.apache.zookeeper" level="WARN"/>
<logger name="org.quartz.core" level="WARN"/>
<logger name="Monitor" level="INFO" additivity="false">
<appender-ref ref="Monitor" />
</logger>
Expand Down
Loading