diff --git a/README.md b/README.md index 718f4e6..e587641 100644 --- a/README.md +++ b/README.md @@ -1,143 +1 @@ -后期实现功能: - 支持导出成excel,csv,sql文件,再加入es导数据到数据库的模块(目前ES官方的logstash-jdbc只支持数据库导数据到ES),后续版本可能开发得比较慢,因为年末很多总结要写。 - 年末开发慢 -# About -该工具实现从ES中导出数据,并且可以对导出的数据格式和数据文件做部分自定义(后面支持更多的自定义),该工具主要使用ES中srcoll接口多线程导出数据. - -# Design -![Base](https://github.com/760515805/es_data_export/blob/master/docs/design.png) -    项目通过两个线程池实现功能,一个线程池主要从ElasticSearch获取数据,获取方式为es接口的Srcoll方式,多线程的话则通过slice切割数据.另一个线程池主要拿来写文件,使用BlockingQueue把数据缓冲到线程队列中,通过一条线程单线程写文件. - -# Version -版本号说明:大版本.新增功能.小补丁 - -## V1.2.4 - -  1.新增线程池监控,在数据导出结束后正确停止程序。
-  2.新增配置启动前验证配置是否正确,设置配置默认值。
-  3.优化异常日志输出,更好排查问题。 - -## V1.2.3 - -  1.优化写文件操作,使用BlockingQueue队列缓存。
-  2.新增支持文件写到一定大小后进行文件切割。
-  3.新增支持SSL加密获取数据。 - -## V1.2.2 - -  1.重构代码,取消自己封装的HTTP工具,使用官方RestClient工具。
-  2.新增支持多线程拉取ES数据。 - -## V1.0.1 - -  1.实现单线程导出数据。 - -# Supported -| Elasticsearch version | support | -| -------- | -----: | -| >= 6.0.0 | yes | -| >= 5.0.0 | not test| -| >= 2.0.0 | not test | -| <= 1 | not test | - -# Running -``` -$git clone git://github.com/760515805/es_data_export.git -$cd es_data_export -``` -如果已经安装了ant环境和maven环境则可以使用以下操作 -``` -$ant -$cd build -$vim global.properties -$./run.sh -``` -如果只安装了maven环境则如下操作 -``` -$mvn clean package -$cp global.properties run.sh stop.sh logback.xml target/ -$cd target -$vim global.properties -$./run.sh -``` -切记修改global.properties文件 - -# Development -## 1.运行环境 -- IDE:IntelliJ IDEA或者Eclipse -- 项目构建工具:Maven - -## 2.初始化项目 -- 打开IntelliJ IDEA,将项目导入 -- 修改global.properties文件配置 -- 运行App.java执行 - -# 配置文件名词解释 -## index - 数据索引 -## type - 索引type,无则可留空,ES7.0以后删除 -## query - 查询条件DSL,必须为ES的查询语句,可留空,默认:查询全部,条数1000 -## includes - 取哪些字段的数据,逗号隔开,如果全部取则设为空 -## threadSize - 获取数据线程数据,最大不超过索引的shards数量和CPU数量,默认为1 -## esserver - ES集群IP地址,逗号隔开,如:192.169.2.98:9200,192.169.2.156:9200,192.169.2.188:9200 -## esusername - 如有帐号密码则填写,如果无则留空 -## espassword - 如有帐号密码则填写,如果无则留空 -## isLineFeed - 导出数据写入文件每条数据是否需要换行,默认:true -## dataLayout - 输出源数据形式,目前支持json、txt,下个版本支持sql、excel,如果为txt字段间是用逗号隔开,默认:json -## filePath - 数据输出文件路径,必填字段 -## fileName - 输出的文件名,无则取默认:index -## fileSize - 每个文件多少条数据分割,需要则设置该项,该值应该比query的size大,如果设置该值则一个文件数据条数到达该数时则会分割,会有一点误差,分割文件名为fileName+文件数,单位:条 -## customFieldName - 自定义字段名,将库里该字段取出来后换为该字段名,原字段名:替换后的字段名,多个逗号隔开,如phone:telphone -## fieldSplit - 字段以什么分割,不设置则默认英文逗号隔开 -## fieldSort - 字段输出顺序(必设),必须和索引表字段名一样,逗号隔开 -## needFieldName - 输出为txt的时候需要字段名字,默认:false,需要的时候以此形式输出类似:fieldName1=fieldValue1,fieldName2=fieldValue2 -## SSL_type - SSL类型 -## SSL_keyStorePath - 密钥地址,文件地址 -## SSL_keyStorePass - 密钥密码 -# 线程池设置 -关于threadSize设置设置为多少合适,这里给出的权重是 --CPU核数>Shards>配置设置 -意思是配置的设置不能大于CPU核数也不能大于索引的shards数量。 -比如我是8核的机器,shards为15,配置设置20,最后取的线程数是8 -如果我是8核的机器,shards为15,配置设置7,最后取的是 7 -# 导出例子 -## 1、导出为json格式 -导出为json格式只需要设置以下,以下根据自己的设置进行设置 -``` -isLineFeed=true -dataLayout=json -filePath=F:\\pb_sa_phone\\test -fileName=pb_sa_phone -fileSize=1000 -customFieldName= -``` -## 2、导出为txt格式 -以下是dataLayout=txt,为json时以下配置都无效的时候的自定义设置 -``` -fieldSplit=, -fieldSort=phone -needFieldName=false -``` -# 联系作者 - -## QQ:760515805 -## wx:chj-95 +内部开发版本,稳定版请使用master分支 \ No newline at end of file diff --git a/build.xml b/build.xml index 6ea916f..5e5411f 100644 --- a/build.xml +++ b/build.xml @@ -24,7 +24,7 @@ - + diff --git "a/docs/V1.0/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\345\215\207\347\272\247\346\214\207\345\257\274.docx" "b/docs/V1.0/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\345\215\207\347\272\247\346\214\207\345\257\274.docx" deleted file mode 100644 index 9a5ceca..0000000 Binary files "a/docs/V1.0/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\345\215\207\347\272\247\346\214\207\345\257\274.docx" and /dev/null differ diff --git "a/docs/V1.0/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\346\224\271\345\212\250\350\257\264\346\230\216.docx" "b/docs/V1.0/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\346\224\271\345\212\250\350\257\264\346\230\216.docx" deleted file mode 100644 index f6d6027..0000000 Binary files "a/docs/V1.0/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\346\224\271\345\212\250\350\257\264\346\230\216.docx" and /dev/null differ diff --git "a/docs/V1.0/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\351\252\214\350\257\201\346\226\271\346\263\225.docx" "b/docs/V1.0/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\351\252\214\350\257\201\346\226\271\346\263\225.docx" deleted file mode 100644 index 2b92223..0000000 Binary files "a/docs/V1.0/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\351\252\214\350\257\201\346\226\271\346\263\225.docx" and /dev/null differ diff --git "a/docs/V1.2/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\344\275\277\347\224\250\346\226\207\346\241\243.docx" "b/docs/V1.2/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\344\275\277\347\224\250\346\226\207\346\241\243.docx" deleted file mode 100644 index 61f2c79..0000000 Binary files "a/docs/V1.2/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\344\275\277\347\224\250\346\226\207\346\241\243.docx" and /dev/null differ diff --git "a/docs/V1.2/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\345\215\207\347\272\247\346\214\207\345\257\274.docx" "b/docs/V1.2/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\345\215\207\347\272\247\346\214\207\345\257\274.docx" deleted file mode 100644 index 8009d3e..0000000 Binary files "a/docs/V1.2/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\345\215\207\347\272\247\346\214\207\345\257\274.docx" and /dev/null differ diff --git "a/docs/V1.2/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\346\224\271\345\212\250\350\257\264\346\230\216.docx" "b/docs/V1.2/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\346\224\271\345\212\250\350\257\264\346\230\216.docx" deleted file mode 100644 index 89b05c9..0000000 Binary files "a/docs/V1.2/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\346\224\271\345\212\250\350\257\264\346\230\216.docx" and /dev/null differ diff --git "a/docs/V1.2/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\351\252\214\350\257\201\346\226\271\346\263\225.docx" "b/docs/V1.2/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\351\252\214\350\257\201\346\226\271\346\263\225.docx" deleted file mode 100644 index d63e02a..0000000 Binary files "a/docs/V1.2/ES\346\225\260\346\215\256\345\257\274\345\207\272\345\267\245\345\205\267-\351\252\214\350\257\201\346\226\271\346\263\225.docx" and /dev/null differ diff --git a/docs/design.png b/docs/design.png deleted file mode 100644 index dce66ba..0000000 Binary files a/docs/design.png and /dev/null differ diff --git a/export.properties b/export.properties new file mode 100644 index 0000000..4caf990 --- /dev/null +++ b/export.properties @@ -0,0 +1,100 @@ +####################################常规设置###################################################### +#获取数据线程数据,最大不超过索引的shards数量和CPU数量,默认为1 +common.thread_size=10 +#################################导出数据ES集群地址################################################### +elasticsearch.hosts=192.169.2.98:9200,192.169.2.156:9200,192.169.2.188:9200 +#数据索引 +elasticsearch.index=pb_sa_phone +#索引type,无则可留空 +elasticsearch.document_type=pb_sa_phone +#查询条件,默认:{"size":10000} +elasticsearch.query={"size":10000,"query":{"bool":{"filter":[{"term":{"status":{"value":3,"boost":2.0}}},{"range":{"phone":{"from":0,"to":null,"include_lower":false,"include_upper":true,"boost":1.0}}},{"bool":{"should":[{"range":{"smsupdtm":{"from":null,"to":"2018-12-09 23:59:59","include_lower":true,"include_upper":false,"boost":3.0}}},{"range":{"rmsupdtm":{"from":null,"to":"2018-12-09 23:59:59","include_lower":true,"include_upper":false,"boost":3.0}}}],"adjust_pure_negative":true,"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}}} +#取哪些字段的数据,逗号隔开,如果全部取则设为空,也可以自行写在elasticsearch.query的查询条件里面 +elasticsearch.includes=phone +#####如有帐号密码则填写##### +#elasticsearch.username= +#elasticsearch.password= + +#####SSL,如有需要就设置,不需要忽略即可##### +#elasticsearch.ssl_type= +#elasticsearch.ssl_keystorepath= +#elasticsearch.ssl_keystorepass= + +############################################导出文件的设置############################################ +##是否启用,true:启用该功能,false:禁用该功能 +file.enabled=true +#输出源数据形式,目前支持json、txt,sql,如果为txt字段间是用逗号隔开,默认:json +file.datalayout=txt + +#是否需要换行,默认:true +file.linefeed=true +#数据输出文件路径,如果需要自定义路径地址可以使用{},目前支持:day,month,years +file.filepath=F:\\pb_sa_phone +####文件名,无则取默认ES的index名,不需要写后缀 +file.filename=pb_sa_phone + +#####是否需要分割文件,true:需要文件分割,false:禁用文件分割保存,默认:false########## +file.need_split_file=true +#need_split_file设为true该值后起效,以什么方式来分割,目前可选的有文件大小(disk)与数据条数(amount),默认:disk +file.split_method=amount +#当split_methods=disk的时候:每个文件多大进行分割,需要则设置该项,实际是有误差的,单位:KB,默认:10240 +#当split_methods=amount的时候:,每个文件多少条数后进行分割,需要则设置该项,实际是有误差的,单位:条,默认:1000000 +file.max_length_file=1000000 + +#####以下是dataLayout=txt的自定义设置###### +#字段以什么分割,不设置则默认英文逗号隔开 +#file.field_split=, +#字段输出顺序(datalayout=txt的时候设置起效,防止数据混乱),必须和索引表字段名一样,逗号隔开 +#file.field_sort=phone +#输出为txt时候需要字段名字,默认:false,需要的时候以此形式输出类似:fieldName1=fieldValue1,fieldName2=fieldValue2 +#file.need_field_name=false + +#####以下是datalayout=sql自定义设置###### +#file.sql_format=INSERT INTO test (phone,msgcode,spnumber) VALUES (#param{phone},#param{msgcode},#param{spnumber}); + +#自定义字段名,将库里该字段取出来后换为该字段名,原字段名:替换后的字段名,多个逗号隔开,如phone:telphone +#file.custom_field_name= + +#######################################ES转移数据到DB######################################################## +##是否启用,true:启用该功能,false:禁用该功能 +db.enabled=false +##驱动jar包地址 +db.jdbc_driver_library=lib/mysql-connector-java-5.1.47.jar +##数据库连接 +db.jdbc_connection_string=jdbc:mysql://192.169.0.23:3306/dblog?useUnicode=true&characterEncoding=utf8 +##驱动程序 +db.jdbc_driver_class=com.mysql.jdbc.Driver +##数据库帐号 +db.jdbc_user=root +##数据库密码 +db.jdbc_password=123456 +##插入数据模版,其中#param{ES字段}来取ES的值 +db.jdbc_template=INSERT INTO test111 (name) VALUES (#param{simuid}) +##单批次最大插入多少,默认10000 +#db.jdbc_size=10000 +##同时写DB线程数,默认1 +db.jdbc_write_thread_size=1 + +#######################################ES转移数据到Kafka######################################################## +##是否启用,true:启用该功能,false:禁用该功能 +kafka.enabled=false +##另一个INDEX的集群ip,如果是相同的可留空,IP逗号隔开 +kafka.hosts=192.169.0.61:9092 +#数据索引 +kafka.topic=pb_sa_chj_04 +#从ES查询出来的数据List分批次发送到kafka,每批次发送多少条,默认1000 +#kafka.send_size=1000 +#每次写kafka是否启用延迟,防止过快写入造成kafka压力大,但延迟写入会造成导出数据慢,默认:0,单位:秒 +#kafka.delay=0 +#####默认写入jsonarray格式,以下这里可设置每条json文件的数据格式,不设置则取全部### +##每条数据新增的内容,如果和ES中查询出来的键重复了会直接覆盖,如果需要取当前时间可"#{now,yyyy-MM-dd HH:mm:ss}"这样取即可,如果只取时间戳则"#{now}",时间戳只到秒,别忘了用""包起来,为空则不设置 +#kafka.add_value={"dttype":1,"updtm":"#{now,yyyy-MM-dd HH:mm:ss}"} +##替换ES中查询出来的数据的键名,旧键名:新键名,旧键名:新键名,为空则不设置 +#kafka.replace_key= +##写kafka的线程,默认1 +#kafka.write_thread_size=1 +#######################################定时器######################################################## +##请设置好以上设置,定时器将定时启动以上的导出线程池 +##是否启用,true:启用该功能,false:禁用该功能 +quartz.enabled=false +quartz.schedule=0 0 0 * * ? \ No newline at end of file diff --git a/global.properties b/global.properties deleted file mode 100644 index 4e2f3bd..0000000 --- a/global.properties +++ /dev/null @@ -1,66 +0,0 @@ -####################################常规设置###################################################### -#数据索引 -index=pb_sa_phone -#索引type,无则可留空 -type=pb_sa_phone -#查询条件,默认: -query={"size":500,"query": {"term": {"status": {"value":1 }}}} -#取哪些字段的数据,逗号隔开,如果全部取则设为空 -includes=phone -#获取数据线程数据,最大不超过索引的shards数量和CPU数量,默认为1 -threadSize=4 - -#################################导出数据ES集群地址################################################### -esserver=192.169.2.98:9200,192.169.2.156:9200,192.169.2.188:9200 -#如有帐号密码则填写 -#esusername= -#espassword= - -#####################################SSL,如有需要就设置,不需要忽略即可##################################### -#SSL_type= -#SSL_keyStorePath= -#SSL_keyStorePass= - - -####!!!!注意,基于性能考虑不能同时导出成文件的同时写入数据库!!!!! - -############################################导出文件的设置############################################ -#输出源数据形式,目前支持json、txt,下个版本支持sql、excel,如果为txt字段间是用逗号隔开,默认:json -dataLayout=json -#是否需要换行,默认:true -isLineFeed=true -#数据输出文件路径,默认:当前程序路径 -filePath=F:\\pb_sa_phone\\test -####文件名,无则取默认:index -fileName=pb_sa_phone -#每个文件多少条数据分割,需要则设置该项,该值应该比query的size大,单位:条 -#fileSize=1000 -#自定义字段名,将库里该字段取出来后换为该字段名,原字段名:替换后的字段名,多个逗号隔开,如phone:telphone -#customFieldName= - -#####以下是dataLayout=txt,为json时以下配置都无效的时候的自定义设置###### -#字段以什么分割,不设置则默认英文逗号隔开 -#fieldSplit=, -#字段输出顺序(dataLayout=txt的时候必设,防止数据混乱),必须和索引表字段名一样,逗号隔开 -#fieldSort=phone -#输出为txt的时候需要字段名字,默认:false,需要的时候以此形式输出类似:fieldName1=fieldValue1,fieldName2=fieldValue2 -#needFieldName=false - -#####以下是dataLayout=sql自定义设置###### -#sqlFormat=INSERT INTO table_name (phone,imid,aa) VALUES (#param{phone},#param{imid},#param{aa}); - -#####以下是dataLayout=excel自定义设置###### - -#####以下是dataLayout=csv自定义设置###### - -#######################################ES转移数据到DB######################################################## -jdbcUrl=jdbc:sqlserver://192.169.2.203:1433;DatabaseName=db_phone_sa_center -##目前支持mysql、sqlserver、postgresql -jdbcDriver=com.microsoft.sqlserver.jdbc.SQLServerDriver -jdbcUsername=sa -jdbcPassword=f2ccfb578acc34a4be54d2744cb91f0d -##插入数据模版,其中#param{ES字段}来取ES的值 -jdbcInsertSql=INSERT INTO table_name (phone,imid,aa,test) VALUES (#param{phone},#param{imid},#param{aa},123); -##单批次最大插入多少,默认10000 -#jdbcInsertSize=10000 - diff --git a/lib/mysql-connector-java-5.1.47.jar b/lib/mysql-connector-java-5.1.47.jar new file mode 100644 index 0000000..8816c7e Binary files /dev/null and b/lib/mysql-connector-java-5.1.47.jar differ diff --git a/lib/postgresql-42.2.5.jar b/lib/postgresql-42.2.5.jar new file mode 100644 index 0000000..d89d433 Binary files /dev/null and b/lib/postgresql-42.2.5.jar differ diff --git a/lib/sqljdbc4.jar b/lib/sqljdbc4.jar new file mode 100644 index 0000000..d6b7f6d Binary files /dev/null and b/lib/sqljdbc4.jar differ diff --git a/logback.xml b/logback.xml index 4d962f8..129669b 100644 --- a/logback.xml +++ b/logback.xml @@ -118,7 +118,11 @@ - + + + + + diff --git a/pom.xml b/pom.xml index 57919fe..3028805 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ com.chenhj es_data_export - 1.2.3-SNAPSHOT + 1.3.5-release jar es_data_export @@ -13,17 +13,17 @@ UTF-8 UTF-8 - 1.2.41 + 1.2.54 1.2.3 1.7.25 - 4.0.0 - + 3.8.1 + 2.6 + 1.1.12 - 5.1.44 - 4.0 - 42.2.2 - + 6.5.4 + 2.3.0 + 2.1.0 UTF-8 1.8 1.8 @@ -39,12 +39,12 @@ commons-io commons-io - 2.6 + ${commons-io} org.apache.commons commons-lang3 - 3.8.1 + ${commons-lang} @@ -73,22 +73,17 @@ druid ${druid} - - - com.microsoft.sqlserver - sqljdbc4 - ${sqlserver} - - + - mysql - mysql-connector-java - ${mysql-connector} + org.apache.kafka + kafka_2.12 + ${kafka} + - org.postgresql - postgresql - ${postgresql} + org.quartz-scheduler + quartz + ${quartz} diff --git a/src/main/java/com/chenhj/App.java b/src/main/java/com/chenhj/App.java index d2a925b..6aaab0e 100644 --- a/src/main/java/com/chenhj/App.java +++ b/src/main/java/com/chenhj/App.java @@ -1,48 +1,39 @@ package com.chenhj; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; - -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.fastjson.JSONObject; -import com.chenhj.constant.ApplicationConfig; import com.chenhj.constant.Constant; -import com.chenhj.constant.Pool; -import com.chenhj.init.LogBack; -import com.chenhj.init.Rest; -import com.chenhj.init.ThreadPool; -import com.chenhj.job.ScrollMultJob; -import com.chenhj.task.ExportDataTask; -import com.chenhj.task.MonitorTask; -import com.chenhj.util.PropertiesAutoSerialize; +import com.chenhj.init.InitConfig; +import com.chenhj.init.InitConnection; +import com.chenhj.init.InitLogBack; +import com.chenhj.init.InitThreadPool; +import com.chenhj.job.ThreadUtil; /** *ES数据导出入口类 */ public class App { - +// | ____| / \ / ___\ \ / / | ____\ \/ / _ \ / _ \| _ \_ _| +// | _| / _ \ \___ \\ V / | _| \ /| |_) | | | | |_) || | +// | |___ / ___ \ ___) || | | |___ / \| __/| |_| | _ < | | +// |_____/_/ \_\____/ |_| |_____/_/\_\_| \___/|_| \_\|_| private static final Logger logger = LoggerFactory.getLogger(App.class); - public static void main( String[] args ) { try { - LogBack.init(); + InitLogBack.init(); logger.info("Log Config Load the success..."); //读取配置文件 - PropertiesAutoSerialize.init(Constant.CONFIG_NAME,ApplicationConfig.class); - //验证配置 - ApplicationConfig.validation(); + InitConfig.init(); logger.info("Config Load the success..."); - //初始化ES - initEs(); - logger.info("ElasticSearch Client Load the success..."); + //初始化连接 + InitConnection.init(); + logger.info("Connection Load the success..."); //线程池初始化 - ThreadPool.init(); + InitThreadPool.init(); logger.info("ThreadPool Load the success..."); - exportDataTask(); - logger.info("Running Success,Version:"+Constant.VERSION); + new ThreadUtil().startConsume(); + logger.info("Running Success,Version:"+Constant.VERSION); + } catch (Exception e) { e.printStackTrace(); logger.info("Running Failed",e); @@ -51,63 +42,5 @@ public static void main( String[] args ) { } } - /** - * 导出数据线程 - * @throws Exception - */ - private static void exportDataTask() throws Exception{ - try { - int threadSize = ApplicationConfig.getRunThreadSize(); - //写文件的线程,单线程操作 - logger.info("ThreadPool Size:"+threadSize); - String scrollId; - //执行任务 - for(int i=0;i list = sJob.executeJob(ApplicationConfig.getScrollQuery(i,threadSize)); - scrollId = sJob.getSrcollId(); - ExportDataTask task = new ExportDataTask(scrollId,list); - Pool.EXECPool.execute(task); - } - //开启监控线程池线程 - monitor(); - } catch (Exception e) { - //关闭线程池 - if(Pool.EXECPool!=null){ - Pool.EXECPool.shutdown(); - } - throw e; - } - } - /** - * 监控线程池(只有在拉取数据的线程结束后才开启这个监控线程) - * @throws InterruptedException - * @throws ExecutionException - */ - public static void monitor() throws InterruptedException, ExecutionException{ - MonitorTask monitorTask = new MonitorTask(); - //1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。 - FutureTask result = new FutureTask<>(monitorTask); - new Thread(result).start(); - Byte status = result.get(); - if(status==Constant.SUCCESS){ - logger.info("Process Exits..."); - System.exit(-1); - } - } - /** - * 初始化ES的连接 - * @throws IllegalAccessException - */ - public static void initEs() throws Exception{ - String ips = ApplicationConfig.getEsserver(); - String username = ApplicationConfig.getEsusername(); - String password = ApplicationConfig.getEspassword(); - //ES初始化 - Rest rest = Rest.Client.setHttpHosts(ips.split(",")); - if(StringUtils.isNotBlank(username)&&StringUtils.isNotBlank(password)){ - rest.validation(username, password); - } - rest.build(); - } + } diff --git a/src/main/java/com/chenhj/config/CommonConfig.java b/src/main/java/com/chenhj/config/CommonConfig.java new file mode 100644 index 0000000..579b630 --- /dev/null +++ b/src/main/java/com/chenhj/config/CommonConfig.java @@ -0,0 +1,46 @@ +/** + * + */ +package com.chenhj.config; + +import com.alibaba.fastjson.JSON; + +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: CommonConfig.java +* @Description: 该类的功能描述 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月28日 下午7:03:23 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月28日 chenhj v1.0.0 修改原因 +*/ +public class CommonConfig { + private int thread_size = 1; + + private String banner; + + public int getThread_size() { + return thread_size; + } + public String getBanner() { + return banner; + } + + public void setBanner(String banner) { + this.banner = banner; + } + + public void setThread_size(int thread_size) { + this.thread_size = thread_size; + } + + public String toString(){ + return JSON.toJSONString(this); + } +} diff --git a/src/main/java/com/chenhj/config/Config.java b/src/main/java/com/chenhj/config/Config.java new file mode 100644 index 0000000..0d767f2 --- /dev/null +++ b/src/main/java/com/chenhj/config/Config.java @@ -0,0 +1,30 @@ +/** + * + */ +package com.chenhj.config; + +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: Config.java +* @Description: 该类的功能描述 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月28日 下午4:28:44 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月28日 chenhj v1.0.0 修改原因 +*/ +public enum Config { + EXPOST; + public static EsConfig ES_CONFIG; + public static JdbcConfig JDBC_CONFIG; + public static FileConfig FILE_CONFIG; + public static KafkaConfig Kafka_CONFIG; + public static CommonConfig COMMON_CONFIG; + public static QuartzConfig QUARTZ_CONFIG; + +} diff --git a/src/main/java/com/chenhj/config/EsConfig.java b/src/main/java/com/chenhj/config/EsConfig.java new file mode 100644 index 0000000..afb7c0b --- /dev/null +++ b/src/main/java/com/chenhj/config/EsConfig.java @@ -0,0 +1,133 @@ +/** + * + */ +package com.chenhj.config; + + +import org.apache.commons.lang3.StringUtils; + +import com.alibaba.fastjson.JSON; +import com.chenhj.init.InitConfig; + +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: EsConfig.java +* @Description: 该类的功能描述 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月28日 下午3:46:13 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月28日 chenhj v1.0.0 修改原因 +*/ +public class EsConfig { + private String index; + private String document_type; + private String query; + private String hosts; + private String username; + private String password; + //SSL设置 + private String ssl_type; + private String ssl_keystorepath; + private String ssl_keystorepass; + + private boolean justGetId; + + private boolean justGetSource; + + private String includes; + + + + public boolean isJustGetId() { + return justGetId; + } + public void setJustGetId(boolean justGetId) { + this.justGetId = justGetId; + } + public boolean isJustGetSource() { + return justGetSource; + } + public void setJustGetSource(boolean justGetSource) { + this.justGetSource = justGetSource; + } + public String getIncludes() { + return includes; + } + public void setIncludes(String includes) { + this.includes = includes; + } + public String getIndex() { + return index; + } + public void setIndex(String index) { + this.index = index; + } + public String getDocument_type() { + return document_type; + } + public void setDocument_type(String document_type) { + this.document_type = document_type; + } + public String getQuery() { + return query; + } + public void setQuery(String query) { + this.query = query; + } + public String getHosts() { + return hosts; + } + public void setHosts(String hosts) { + this.hosts = hosts; + } + public String getUsername() { + return username; + } + public void setUsername(String username) { + this.username = username; + } + public String getPassword() { + return password; + } + public void setPassword(String password) { + this.password = password; + } + public String getSsl_type() { + return ssl_type; + } + public void setSsl_type(String ssl_type) { + this.ssl_type = ssl_type; + } + public String getSsl_keystorepath() { + return ssl_keystorepath; + } + public void setSsl_keystorepath(String ssl_keystorepath) { + this.ssl_keystorepath = ssl_keystorepath; + } + public String getSsl_keystorepass() { + return ssl_keystorepass; + } + public void setSsl_keystorepass(String ssl_keystorepass) { + this.ssl_keystorepass = ssl_keystorepass; + } + public String toString(){ + return JSON.toJSONString(this); + } + public void validation(){ + //查看下启动jar包的时候时候自定义输入过参数值 java -Dquery=参数值 -Dindex=test -Ddocument_type=test1 -jar es_data_export + //注意:-D和Para之间不能有空格 + String queryRun= System.getProperty("query"); + if(StringUtils.isNotBlank(queryRun)){ + query = queryRun; + } + InitConfig.requireNonNull(index, "index 不能为空"); + InitConfig.requireNonNull(hosts, "hosts集群地址不能为空"); + + } +} diff --git a/src/main/java/com/chenhj/config/FileConfig.java b/src/main/java/com/chenhj/config/FileConfig.java new file mode 100644 index 0000000..1537d65 --- /dev/null +++ b/src/main/java/com/chenhj/config/FileConfig.java @@ -0,0 +1,162 @@ +package com.chenhj.config; + + + +import org.apache.commons.lang3.StringUtils; + +import com.alibaba.fastjson.JSON; +import com.chenhj.constant.Constant; +import com.chenhj.init.InitConfig; + +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: FileConfig.java +* @Description: 该类的功能描述 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月28日 下午3:45:01 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月28日 chenhj v1.0.0 修改原因 +*/ +public class FileConfig { + //是否启用 + private boolean enabled = false; + //换行 + private Boolean linefeed=true; + //数据类型 + private String datalayout; + //文件路径 + private String filepath; + private String filename; + private String max_length_file; + private String custom_field_name; + private String field_split=Constant.COMMA_SIGN; + private String field_sort; + private Boolean need_field_name=false; + private String split_method = "disk"; + private String sql_format; + + private String csv_headers; + + private Boolean need_split_file= false; + + + + public String getSplit_method() { + return split_method; + } + public void setSplit_method(String split_method) { + this.split_method = split_method; + } + public Boolean getNeed_split_file() { + return need_split_file; + } + public void setNeed_split_file(Boolean need_split_file) { + this.need_split_file = need_split_file; + } + public String getCsv_headers() { + return csv_headers; + } + public void setCsv_headers(String csv_headers) { + this.csv_headers = csv_headers; + } + public boolean isEnabled() { + return enabled; + } + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + public Boolean getLinefeed() { + return linefeed; + } + public void setLinefeed(Boolean linefeed) { + this.linefeed = linefeed; + } + public String getDatalayout() { + return datalayout; + } + public void setDatalayout(String datalayout) { + this.datalayout = datalayout; + } + public String getFilepath() { + return filepath; + } + public void setFilepath(String filepath) { + this.filepath = filepath; + } + public String getFilename() { + return filename; + } + public void setFilename(String filename) { + this.filename = filename; + } + + + + public String getMax_length_file() { + return max_length_file; + } + public void setMax_length_file(String max_length_file) { + this.max_length_file = max_length_file; + } + public String getCustom_field_name() { + return custom_field_name; + } + public void setCustom_field_name(String custom_field_name) { + this.custom_field_name = custom_field_name; + } + public String getField_split() { + return field_split; + } + public void setField_split(String field_split) { + this.field_split = field_split; + } + public String getField_sort() { + return field_sort; + } + public void setField_sort(String field_sort) { + this.field_sort = field_sort; + } + public Boolean getNeed_field_name() { + return need_field_name; + } + public void setNeed_field_name(Boolean need_field_name) { + this.need_field_name = need_field_name; + } + public String getSql_format() { + return sql_format; + } + public void setSql_format(String sql_format) { + this.sql_format = sql_format; + } + public String toString(){ + return JSON.toJSONString(this); + } + public void validation(){ + if(enabled){ + InitConfig.requireNonNull(linefeed, "linefeed 不能为null"); + InitConfig.requireNonNull(datalayout, "datalayout不能为空"); + InitConfig.requireNonNull(filepath, "filepath数据存储文件路径不能为空"); + InitConfig.requireNonNull(filename, "filename数据存储文件名不能为空"); + if(need_split_file){ + if(StringUtils.isBlank(max_length_file)){ + switch (split_method) { + case "disk": + max_length_file = "10240"; + break; + case "amount": + max_length_file = "1000000"; + break; + default: + break; + } + } + } + } + } +} diff --git a/src/main/java/com/chenhj/config/JdbcConfig.java b/src/main/java/com/chenhj/config/JdbcConfig.java new file mode 100644 index 0000000..1cfa30d --- /dev/null +++ b/src/main/java/com/chenhj/config/JdbcConfig.java @@ -0,0 +1,134 @@ +/** + * + */ +package com.chenhj.config; + +import java.util.Map; + +import com.alibaba.fastjson.JSON; +import com.chenhj.init.InitConfig; +import com.chenhj.util.SqlParser; + +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: JdbcConfig.java +* @Description: 该类的功能描述 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月28日 下午3:45:54 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月28日 chenhj v1.0.0 修改原因 +*/ +public class JdbcConfig { + private String jdbc_driver_library; + private String jdbc_driver_class; + private String jdbc_connection_string; + private String jdbc_user; + private String jdbc_password; + //是否启用 + private boolean enabled = false; + private Integer jdbc_size = 10000; + private String jdbc_template; + + private String tableName; + + private Map fieldMap; + + private int jdbc_write_thread_size = 1; + + + + public String getTableName() { + return tableName; +} +public Map getFieldMap() { + return fieldMap; +} +public void setFieldMap(Map fieldMap) { + this.fieldMap = fieldMap; +} +public int getJdbc_write_thread_size() { + return jdbc_write_thread_size; +} +public void setJdbc_write_thread_size(int jdbc_write_thread_size) { + this.jdbc_write_thread_size = jdbc_write_thread_size; +} +public String getJdbc_template() { + return jdbc_template; + } + public void setJdbc_template(String jdbc_template) { + this.jdbc_template = jdbc_template; + } + public String getJdbc_driver_library() { + return jdbc_driver_library; + } + public void setJdbc_driver_library(String jdbc_driver_library) { + this.jdbc_driver_library = jdbc_driver_library; + } + public String getJdbc_driver_class() { + return jdbc_driver_class; + } + public void setJdbc_driver_class(String jdbc_driver_class) { + this.jdbc_driver_class = jdbc_driver_class; + } + public String getJdbc_connection_string() { + return jdbc_connection_string; + } + public void setJdbc_connection_string(String jdbc_connection_string) { + this.jdbc_connection_string = jdbc_connection_string; + } + public String getJdbc_user() { + return jdbc_user; + } + public void setJdbc_user(String jdbc_user) { + this.jdbc_user = jdbc_user; + } + public String getJdbc_password() { + return jdbc_password; + } + public void setJdbc_password(String jdbc_password) { + this.jdbc_password = jdbc_password; + } + + public boolean isEnabled() { + return enabled; + } + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + public Integer getJdbc_size() { + return jdbc_size; + } + public void setJdbc_size(Integer jdbc_size) { + this.jdbc_size = jdbc_size; + } + public String toString(){ + return JSON.toJSONString(this); + } + public void validation() throws IllegalArgumentException{ + if(enabled){ + InitConfig.requireNonNull(jdbc_driver_library, "jdbc_driver_library 不能为null"); + InitConfig.requireNonNull(jdbc_driver_class, "jdbc_driver_class不能为空"); + InitConfig.requireNonNull(jdbc_connection_string, "jdbc_connection_string不能为空"); + InitConfig.requireNonNull(jdbc_user, "jdbc_user不能为空"); + InitConfig.requireNonNull(jdbc_password, "jdbc_password不能为空"); + InitConfig.requireNonNull(jdbc_template, "jdbc_template不能为空"); + + //获得参数的标志位 + fieldMap = SqlParser.getConfigParent(jdbc_template); + jdbc_template=SqlParser.toLegalSql(jdbc_template); + //验证sql合法性 + if(!SqlParser.isInsertSql(jdbc_template)){ + throw new IllegalArgumentException("SQL jdbc_template 只支持insert和update"); + }; + tableName = SqlParser.getTableName(); + InitConfig.requireNonNull(tableName, "tableName不能为空"); + } + } + +} diff --git a/src/main/java/com/chenhj/config/KafkaConfig.java b/src/main/java/com/chenhj/config/KafkaConfig.java new file mode 100644 index 0000000..65c9fd1 --- /dev/null +++ b/src/main/java/com/chenhj/config/KafkaConfig.java @@ -0,0 +1,136 @@ +/** + * + */ +package com.chenhj.config; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.chenhj.constant.Constant; +import com.chenhj.init.InitConfig; + +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: KafkaConfig.java +* @Description: kafka配置文件 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月28日 下午3:46:13 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月28日 chenhj v1.0.0 修改原因 +*/ +public class KafkaConfig { + + private String hosts; + + private String topic; + + private String add_value; + + private String replace_key; + + private Integer write_thread_size =1; + + private Integer send_size = 1000; + + private Integer delay = 0; + + private JSONObject add_value_JSON; + + private Map replace_key_Map; + + + + public JSONObject getAdd_value_JSON() { + return add_value_JSON; + } + public void setAdd_value_JSON(JSONObject add_value_JSON) { + this.add_value_JSON = add_value_JSON; + } + + + public Map getReplace_key_Map() { + return replace_key_Map; + } + public void setReplace_key_Map(Map replace_key_Map) { + this.replace_key_Map = replace_key_Map; + } + public String getAdd_value() { + return add_value; + } + public void setAdd_value(String add_value) { + this.add_value = add_value; + } + public String getReplace_key() { + return replace_key; + } + public void setReplace_key(String replace_key) { + this.replace_key = replace_key; + } + public Integer getDelay() { + return delay; + } + public void setDelay(Integer delay) { + this.delay = delay; + } + public Integer getSend_size() { + return send_size; + } + public void setSend_size(Integer send_size) { + this.send_size = send_size; + } + private boolean enabled = false; + + public String getHosts() { + return hosts; + } + public void setHosts(String hosts) { + this.hosts = hosts; + } + public String getTopic() { + return topic; + } + public void setTopic(String topic) { + this.topic = topic; + } + public Integer getWrite_thread_size() { + return write_thread_size; + } + public void setWrite_thread_size(Integer write_thread_size) { + this.write_thread_size = write_thread_size; + } + public boolean isEnabled() { + return enabled; + } + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + public String toString(){ + return JSON.toJSONString(this); + } + public void validation(){ + InitConfig.requireNonNull(hosts, "kafka hosts 不能为空"); + InitConfig.requireNonNull(topic, "topic不能为空"); + if(StringUtils.isNotBlank(add_value)){ + add_value_JSON = JSON.parseObject(add_value); + } + if(StringUtils.isNotBlank(replace_key)){ + String fields[] = replace_key.split(Constant.COMMA_SIGN); + replace_key_Map = new HashMap(); + for(String field:fields){ + String old_new[] = field.split(Constant.COLON); + replace_key_Map.put(old_new[0], old_new[1]); + } + + } + } +} diff --git a/src/main/java/com/chenhj/config/QuartzConfig.java b/src/main/java/com/chenhj/config/QuartzConfig.java new file mode 100644 index 0000000..05b5232 --- /dev/null +++ b/src/main/java/com/chenhj/config/QuartzConfig.java @@ -0,0 +1,67 @@ +/** + * + */ +package com.chenhj.config; + +import org.quartz.CronExpression; + +import com.alibaba.fastjson.JSON; +import com.chenhj.init.InitConfig; + +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: QuartzConfig.java +* @Description:定时器配置 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月28日 下午7:03:23 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月28日 chenhj v1.0.0 修改原因 +*/ +public class QuartzConfig { + + private boolean enabled = false; + + private String schedule; + + + + public boolean isEnabled() { + return enabled; + } + + + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + + + public String getSchedule() { + return schedule; + } + + + + public void setSchedule(String schedule) { + this.schedule = schedule; + } + + public void validation() throws IllegalArgumentException{ + if(enabled){ + InitConfig.requireNonNull(schedule, "schedule 不能为null"); + if(!CronExpression.isValidExpression(schedule)){ + throw new IllegalArgumentException("schedule config error(schedule配置参数值错误)"); + } + } + } + public String toString(){ + return JSON.toJSONString(this); + } +} diff --git a/src/main/java/com/chenhj/constant/ApplicationConfig.java b/src/main/java/com/chenhj/constant/ApplicationConfig.java deleted file mode 100644 index 1c7c5ec..0000000 --- a/src/main/java/com/chenhj/constant/ApplicationConfig.java +++ /dev/null @@ -1,246 +0,0 @@ -/** - * - */ -package com.chenhj.constant; - -import org.apache.commons.lang3.StringUtils; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.chenhj.job.EsInfoJob; - -/** -* Copyright: Copyright (c) 2018 Montnets -* -* @ClassName: ApplicationConfig.java -* @Description: 该类的功能描述 -* -* @version: v1.0.0 -* @author: chenhj -* @date: 2018年12月5日 上午10:15:55 -* -* Modification History: -* Date Author Version Description -*---------------------------------------------------------* -* 2018年12月5日 chenhj v1.0.0 修改原因 -*/ -public class ApplicationConfig{ - private static String index; - private static String type; - private static String query; - private static String esserver; - private static String esusername; - private static String espassword; - private static Boolean isLineFeed=true; - private static String dataLayout; - private static String filePath; - private static String fileName; - private static String fileSize; - private static String customFieldName; - private static String fieldSplit=Constant.COMMA_SIGN; - private static String fieldSort; - private static Boolean needFieldName=false; - private static String sqlFormat; - private static String includes; - private static Integer threadSize=1; - //SSL设置 - private static String SSL_type; - private static String SSL_keyStorePath; - private static String SSL_keyStorePass; - //数据库 - private static String jdbcUrl; - private static String jdbcDriver; - private static String jdbcUsername; - private static String jdbcPassword; - private static String jdbcInsertSql; - private static Integer jdbcInsertSize=10000; - - - - public static String getJdbcUrl() { - return jdbcUrl; - } - public static void setJdbcUrl(String jdbcUrl) { - ApplicationConfig.jdbcUrl = jdbcUrl; - } - public static String getJdbcDriver() { - return jdbcDriver; - } - public static void setJdbcDriver(String jdbcDriver) { - ApplicationConfig.jdbcDriver = jdbcDriver; - } - public static String getJdbcUsername() { - return jdbcUsername; - } - public static void setJdbcUsername(String jdbcUsername) { - ApplicationConfig.jdbcUsername = jdbcUsername; - } - public static String getJdbcPassword() { - return jdbcPassword; - } - public static void setJdbcPassword(String jdbcPassword) { - ApplicationConfig.jdbcPassword = jdbcPassword; - } - public static String getJdbcInsertSql() { - return jdbcInsertSql; - } - public static void setJdbcInsertSql(String jdbcInsertSql) { - ApplicationConfig.jdbcInsertSql = jdbcInsertSql; - } - public static Integer getJdbcInsertSize() { - return jdbcInsertSize; - } - public static void setJdbcInsertSize(Integer jdbcInsertSize) { - ApplicationConfig.jdbcInsertSize = jdbcInsertSize; - } - public static String getSSL_type() { - return SSL_type; - } - public static String getSSL_keyStorePath() { - return SSL_keyStorePath; - } - public static String getSSL_keyStorePass() { - return SSL_keyStorePass; - } - public static int getThreadSize() { - return threadSize; - } - public static String getIndex() { - return index; - } - public static String getType() { - return type; - } - public static String getQuery() { - if(StringUtils.isBlank(query)){ - query="{\"size\":1000,\"query\":{\"match_all\":{}}}"; - } - return query; - } - public static String getEsserver() { - return esserver; - } - public static String getEsusername() { - return esusername; - } - public static String getEspassword() { - return espassword; - } - public static boolean isLineFeed() { - return isLineFeed; - } - public static String getDataLayout() { - return dataLayout; - } - public static String getFilePath() { - return filePath; - } - public static String getFileName() { - return fileName; - } - public static String getFileSize() { - return fileSize; - } - public static String getCustomFieldName() { - return customFieldName; - } - public static String getFieldSplit() { - return fieldSplit; - } - public static String getFieldSort() { - return fieldSort; - } - public static boolean isNeedFieldName() { - return needFieldName; - } - public static String getSqlFormat() { - return sqlFormat; - } - public static String getIncludes() { - return includes; - } - public static String getScrollQuery(Integer nowid,Integer maxid) { - - String query = getQuery(); - String includes =ApplicationConfig.getIncludes(); - JSONObject params = new JSONObject(); - if(StringUtils.isNoneEmpty(query)){ - params = JSON.parseObject(query); - } - if(nowid!=null&&maxid!=null&&maxid>1){ -// if(maxid<=1){ -// throw new IllegalArgumentException("max must be greater than 1"); -// } - if(maxid<=nowid){ - throw new IllegalArgumentException("max must be greater than id"); - } - JSONObject slice = new JSONObject(); - slice.put("id",nowid); - slice.put("max",maxid); - params.put("slice", slice); - } - if(StringUtils.isNoneEmpty(includes)){ - JSONObject inc = new JSONObject(); - String field[] = includes.split(","); - inc.put("includes", field); - params.put("_source",inc); - } - if(StringUtils.isBlank(params.getString("sort"))){ - String sort[] ={"_doc"}; - params.put("sort", sort); - } - return params.toJSONString(); - } - public static int getRunThreadSize() throws Exception{ - EsInfoJob esInfo = new EsInfoJob(); - //索引分片数 - int share = esInfo.getIndexShards(ApplicationConfig.getIndex());//优先级2 - //配置最大线程 - int threadSize = ApplicationConfig.getThreadSize(); //优先级1 - //当前机器CPU数 - int nowCpu = Runtime.getRuntime().availableProcessors(); //优先级3 - //如果分区数小于最大线程数,则线程数取分区的数量 - if(share map; - public DbDaoImpl() { - map = SqlParser.parserInsert(ApplicationConfig.getJdbcInsertSql()); - sql =String.format(sql,SqlParser.getTableName(),StringUtils.join(SqlParser.getColumnList(), ","),StringUtils.join(SqlParser.getValueList(), ",")); - } + private static ConnectionManager dbp = ConnectionManager.getInstance(); + private String sql = Config.JDBC_CONFIG.getJdbc_template(); + private Integer insertSize = Config.JDBC_CONFIG.getJdbc_size(); + private Map fieldMap = Config.JDBC_CONFIG.getFieldMap(); @Override public synchronized void insert(List list) throws SQLException { //该sql语句是如果库中存在直接覆盖 @@ -58,48 +51,42 @@ public synchronized void insert(List list) throws SQLException { conn.setAutoCommit(false); //判断条数 int size = list.size(); - - if(size>insertSize){ - list.subList(0, insertSize-1); + List listTemp = null; + //分批存入和写入DB数据,单批insertSize条 + for(int i=0;i<=size;i+=insertSize){ + if(i+insertSize>size){ + insertSize=size-i; //作用为insertSize最后没有100条数据则剩余几条listTemp中就装几条 + } + listTemp = list.subList(i,i+insertSize); + if(listTemp==null||listTemp.isEmpty()){ + break; + } + for(JSONObject msg:listTemp){ + try { + dataFormat(statement,msg); + } catch (Exception e) { + logger.error("批量插入单条出错抛弃,数据:{},异常:{}",msg.toString(),e); + continue; + } + //完成一条语句的赋值 + statement.addBatch(); + } + //执行批量操作 + statement.executeBatch(); + //提交事务 + conn.commit(); } - - for(JSONObject msg:list){ - try { - dataFormat(statement,msg); - } catch (Exception e) { - logger.error("批量插入单条出错抛弃,数据:{},异常:{}",msg.toString(),e); - continue; - } - //完成一条语句的赋值 - statement.addBatch(); - } - //执行批量操作 - statement.executeBatch(); - //提交事务 - conn.commit(); } catch (SQLException e) { throw e; }finally{ dbp.close(null, statement, conn); } - } private void dataFormat(PreparedStatement statement,JSONObject json) throws SQLException{ - int i = 1; - for (Entry entry : map.entrySet()) { - Object obj = entry.getValue(); - if(obj instanceof String){ - String value = String.valueOf(obj); - if(value.startsWith("##param")){ - statement.setObject(i, json.get(value)); - } - } - i++; + for (Entry entry : fieldMap.entrySet()) { + Integer index = entry.getValue(); + String key = entry.getKey(); + statement.setObject(index,json.get(key)); } } - - public static void main(String[] args) { - - } - } diff --git a/src/main/java/com/chenhj/init/Rest.java b/src/main/java/com/chenhj/es/Rest.java similarity index 90% rename from src/main/java/com/chenhj/init/Rest.java rename to src/main/java/com/chenhj/es/Rest.java index d2921da..699fe85 100644 --- a/src/main/java/com/chenhj/init/Rest.java +++ b/src/main/java/com/chenhj/es/Rest.java @@ -1,7 +1,7 @@ /** * */ -package com.chenhj.init; +package com.chenhj.es; import java.io.IOException; import java.io.InputStream; @@ -17,7 +17,6 @@ import javax.net.ssl.SSLContext; -import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHost; import org.apache.http.HttpStatus; @@ -39,7 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.chenhj.constant.ApplicationConfig; +import com.chenhj.config.Config; import com.chenhj.constant.Constant; /** @@ -167,9 +166,9 @@ public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpCli }); } private void setEsSSL() throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException, KeyManagementException{ - String SSL_type = ApplicationConfig.getSSL_type(); - String SSL_keyStorePath = ApplicationConfig.getSSL_keyStorePath(); - String SSL_keyStorePass= ApplicationConfig.getSSL_keyStorePass(); + String SSL_type = Config.ES_CONFIG.getSsl_type(); + String SSL_keyStorePath = Config.ES_CONFIG.getSsl_keystorepath(); + String SSL_keyStorePass= Config.ES_CONFIG.getSsl_keystorepass(); if(StringUtils.isNotBlank(SSL_type)&&StringUtils.isNotBlank(SSL_keyStorePath)&&StringUtils.isNotBlank(SSL_keyStorePass)){ KeyStore truststore = KeyStore.getInstance(SSL_type); Path path= Paths.get(SSL_keyStorePath); @@ -218,14 +217,4 @@ private boolean validation(){ } return flag; } - public static void main(String[] args) throws Exception { - RestClient client = Rest.Client.getRestClient(); - - Response response =client.performRequest(null); - InputStream in=response.getEntity().getContent(); - IOUtils.toString(in,"UTF-8"); - //释放该连接 - in.close(); - - } } diff --git a/src/main/java/com/chenhj/init/InitConfig.java b/src/main/java/com/chenhj/init/InitConfig.java new file mode 100644 index 0000000..7778dd4 --- /dev/null +++ b/src/main/java/com/chenhj/init/InitConfig.java @@ -0,0 +1,174 @@ +/** + * + */ +package com.chenhj.init; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; + +import com.chenhj.config.CommonConfig; +import com.chenhj.config.Config; +import com.chenhj.config.EsConfig; +import com.chenhj.config.FileConfig; +import com.chenhj.config.JdbcConfig; +import com.chenhj.config.KafkaConfig; +import com.chenhj.config.QuartzConfig; +import com.chenhj.constant.Constant; +import com.chenhj.util.PropertiesUtil; + +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: InitConfig.java +* @Description: 初始化配置 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月28日 下午4:34:38 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月28日 chenhj v1.0.0 修改原因 +*/ +public class InitConfig { + private static Map map=null; + + public static void init() throws Exception { + map = new PropertiesUtil(Constant.CONFIG_NAME).loadProperties(); + if(map!=null){ + String value; + String prefix; + String field; + EsConfig esConfig = new EsConfig(); + FileConfig fileConfig = new FileConfig(); + JdbcConfig jdbcConfig = new JdbcConfig(); + KafkaConfig kafkaConfig = new KafkaConfig(); + CommonConfig commonConfig = new CommonConfig(); + QuartzConfig quartzConfig = new QuartzConfig(); + //加载配置文件的参数 + for(Map.Entry entry : map.entrySet()){ + value = entry.getValue(); + String keys[] = entry.getKey().split(Constant.DOT); + //点号隔开 + prefix = keys[0]; + if(keys.length!=2){ + continue; + } + field = keys[1]; + switch (prefix) { + case Constant.ELASTICSEARCH: + setConfig(esConfig,field,value); + break; + case Constant.FILE: + setConfig(fileConfig,field,value); + break; + case Constant.DB: + setConfig(jdbcConfig,field,value); + break; + case Constant.COMMON: + setConfig(commonConfig,field,value); + break; + case Constant.KAFKA: + setConfig(kafkaConfig,field,value); + break; + case Constant.QUARTZ: + setConfig(quartzConfig,field,value); + break; + default: + break; + } + } + //验证配置文件的参数 + esConfig.validation(); + fileConfig.validation(); + jdbcConfig.validation(); + kafkaConfig.validation(); + quartzConfig.validation(); + //赋值全局变量 + Config.ES_CONFIG = esConfig; + Config.FILE_CONFIG = fileConfig; + Config.JDBC_CONFIG = jdbcConfig; + Config.COMMON_CONFIG = commonConfig; + Config.Kafka_CONFIG = kafkaConfig; + Config.QUARTZ_CONFIG = quartzConfig; + } + } + private static void setConfig(Object obj,String key,String value) throws Exception{ + Field field; + if (null == (field = getField(obj.getClass(), key))) { + return; + } + //不为静态不设置 + if (Modifier.isStatic(field.getModifiers())) { + return; + } + if (Modifier.isFinal(field.getModifiers())) { + return; + } + field.setAccessible(true); + setField(obj,field, value); + } + /** + * 通过反射获取待转类clazz中指定字段名的字段,如果字段不存在则返回null + * + * @param fieldName 去查找待转类中的指定字段 + * @return 返回指定的字段 + */ + private static Field getField(Class clazz,String fieldName) { + try { + return clazz.getDeclaredField(fieldName); + } catch (Exception ignored) { + } + return null; + } + /** + * 对指定的字段进行设置值,目前仅支持字段类型: + * String,boolean,byte,char,short,int,long,float,double + * + * @param field 指定的字段 + * @param value 设置值 + * @throws IllegalAccessException + * @throws IllegalArgumentException + */ + private static void setField(Object object,Field field, String value) throws Exception { + Class type = field.getType(); + Object par = null; + if (String.class.equals(field.getType())) { + par = value; + } else if (int.class.equals(type) || Integer.class.equals(type)) { + par = Integer.valueOf(value); + } else if (boolean.class.equals(type) || Boolean.class.equals(type)) { + par = Boolean.valueOf(value); + } else if (long.class.equals(type) || Long.class.equals(type)) { + par = Long.valueOf(value); + } else if (double.class.equals(type) || Double.class.equals(type)) { + par = Double.valueOf(value); + } else if (float.class.equals(type) || Float.class.equals(type)) { + par = Float.valueOf(value); + } else if (short.class.equals(type) || Short.class.equals(type)) { + par = Short.valueOf(value); + } else if (byte.class.equals(type) || Byte.class.equals(type)) { + par = Byte.valueOf(value); + } else if (char.class.equals(type)) { + par = value.charAt(0); + } + field.set(object, par); + } + public static void requireNonNull(Object obj,String msg){ + if(obj==null){ + throw new NullPointerException(msg); + } + if(obj instanceof String){ + String mss = String.valueOf(obj); + if(StringUtils.isBlank(mss)){ + throw new NullPointerException(msg); + } + } + + } + +} diff --git a/src/main/java/com/chenhj/init/InitConnection.java b/src/main/java/com/chenhj/init/InitConnection.java new file mode 100644 index 0000000..eadf4a7 --- /dev/null +++ b/src/main/java/com/chenhj/init/InitConnection.java @@ -0,0 +1,75 @@ +/** + * + */ +package com.chenhj.init; + +import org.apache.commons.lang3.StringUtils; + +import com.chenhj.config.Config; +import com.chenhj.dao.ConnectionManager; +import com.chenhj.es.Rest; +import com.chenhj.util.kafka.KafkaUtil; + +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: InitConnection.java +* @Description: 测试连接是否正常 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月28日 下午7:37:16 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月28日 chenhj v1.0.0 修改原因 +*/ +public class InitConnection { + private static boolean jdbcEnabled; + private static boolean kafkaEnabled; + public static void init() throws Exception{ + initEs(); + //如果启用了DB,检查DB连接 + jdbcEnabled=Config.JDBC_CONFIG.isEnabled(); + kafkaEnabled = Config.Kafka_CONFIG.isEnabled(); + if(jdbcEnabled){ + ConnectionManager dbp =ConnectionManager.getInstance(); + boolean flag = dbp.isValid(); + if(!flag){ + throw new Exception("DB 连接失败"); + } + String tableName = Config.JDBC_CONFIG.getTableName(); + boolean existTable = dbp.validateTableNameExist(tableName); + if(!existTable){ + throw new IllegalAccessException(tableName+"表不存在,请检查jdbc_template是否书写正确!!!"); + } + dbp = null; + } + if(kafkaEnabled){ + initKafka(); + } + } + /** + * 初始化ES的连接 + * @throws IllegalAccessException + */ + private static void initEs() throws Exception{ + String ips = Config.ES_CONFIG.getHosts(); + String username = Config.ES_CONFIG.getUsername(); + String password = Config.ES_CONFIG.getPassword(); + //ES初始化 + Rest rest = Rest.Client.setHttpHosts(ips.split(",")); + if(StringUtils.isNotBlank(username)&&StringUtils.isNotBlank(password)){ + rest.validation(username, password); + } + rest.build(); + } + /** + * 初始化Kafka的连接 + * @throws IllegalAccessException + */ + private static void initKafka() throws Exception{ + KafkaUtil.validation(); + } +} diff --git a/src/main/java/com/chenhj/init/LogBack.java b/src/main/java/com/chenhj/init/InitLogBack.java similarity index 94% rename from src/main/java/com/chenhj/init/LogBack.java rename to src/main/java/com/chenhj/init/InitLogBack.java index 0b53d9d..eae8d76 100644 --- a/src/main/java/com/chenhj/init/LogBack.java +++ b/src/main/java/com/chenhj/init/InitLogBack.java @@ -28,7 +28,7 @@ *---------------------------------------------------------* * 2018年12月7日 chenhj v1.0.0 修改原因 */ -public class LogBack { +public class InitLogBack { public static void init() throws Exception{ //String configFilepathName = FilePathHelper.getFilePathWithJar("logback.xml"); diff --git a/src/main/java/com/chenhj/init/InitThreadPool.java b/src/main/java/com/chenhj/init/InitThreadPool.java new file mode 100644 index 0000000..252cdaf --- /dev/null +++ b/src/main/java/com/chenhj/init/InitThreadPool.java @@ -0,0 +1,45 @@ +/** + * + */ +package com.chenhj.init; + +import java.util.concurrent.Executors; + +import com.chenhj.config.Config; +import com.chenhj.constant.Pool; +import com.chenhj.thread.ThreadPoolManager; + +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: ThreadPool.java +* @Description: 线程池初始化 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月7日 下午2:20:13 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月7日 chenhj v1.0.0 修改原因 +*/ +public class InitThreadPool { + + public static void init() throws Exception{ + //拉取数据线程池 + Pool.EXECPool = Executors.newFixedThreadPool(Config.COMMON_CONFIG.getThread_size()); + //写文件线程池 + if(Config.FILE_CONFIG.isEnabled()){ + Pool.WRITE_FILE_POOL = ThreadPoolManager.newInstance(1).build(); + } + //写DB线程池 + if(Config.JDBC_CONFIG.isEnabled()){ + Pool.WRITE_DB_POOL = ThreadPoolManager.newInstance(Config.JDBC_CONFIG.getJdbc_write_thread_size()).build(); + } + //写Kafka线程池 + if(Config.Kafka_CONFIG.isEnabled()){ + Pool.WRITE_KAFKA_POOL = ThreadPoolManager.newInstance(Config.Kafka_CONFIG.getWrite_thread_size()).build(); + } + } +} diff --git a/src/main/java/com/chenhj/init/ThreadPool.java b/src/main/java/com/chenhj/init/ThreadPool.java deleted file mode 100644 index b0dbac4..0000000 --- a/src/main/java/com/chenhj/init/ThreadPool.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * - */ -package com.chenhj.init; - -import java.util.concurrent.Executors; - -import com.chenhj.constant.ApplicationConfig; -import com.chenhj.constant.Pool; -import com.chenhj.thread.ThreadPoolManager; - -/** -* Copyright: Copyright (c) 2018 Montnets -* -* @ClassName: ThreadPool.java -* @Description: 线程池初始化 -* -* @version: v1.0.0 -* @author: chenhj -* @date: 2018年12月7日 下午2:20:13 -* -* Modification History: -* Date Author Version Description -*---------------------------------------------------------* -* 2018年12月7日 chenhj v1.0.0 修改原因 -*/ -public class ThreadPool { - - public static void init() throws Exception{ - int threadSize = ApplicationConfig.getRunThreadSize(); - //写文件线程池 - Pool.WRITE_FILE_POOL = ThreadPoolManager.newInstance(1).build(); - //拉取数据线程池 - Pool.EXECPool = Executors.newFixedThreadPool(threadSize); - //计数器 - // Pool.LATCH = new CountDownLatch(threadSize); - } -} diff --git a/src/main/java/com/chenhj/job/EsInfoJob.java b/src/main/java/com/chenhj/job/EsInfoJob.java index 1301e6e..c8bd0a0 100644 --- a/src/main/java/com/chenhj/job/EsInfoJob.java +++ b/src/main/java/com/chenhj/job/EsInfoJob.java @@ -14,7 +14,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.chenhj.constant.Constant; -import com.chenhj.init.Rest; +import com.chenhj.es.Rest; /** * Copyright: Copyright (c) 2018 Montnets diff --git a/src/main/java/com/chenhj/job/ThreadUtil.java b/src/main/java/com/chenhj/job/ThreadUtil.java new file mode 100644 index 0000000..6c73d81 --- /dev/null +++ b/src/main/java/com/chenhj/job/ThreadUtil.java @@ -0,0 +1,144 @@ +package com.chenhj.job; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.chenhj.config.Config; +import com.chenhj.config.QuartzConfig; +import com.chenhj.constant.Constant; +import com.chenhj.constant.Pool; +import com.chenhj.task.ExportDataMasterTask; +import com.chenhj.task.MonitorTask; + + +public class ThreadUtil { + + private static final Logger LOG = LoggerFactory.getLogger(ThreadUtil.class); + static int nThreads = 1; + public ThreadUtil() throws Exception { + nThreads = Config.COMMON_CONFIG.getThread_size(); + } + /** + * 启动所有线程池 + * @throws Exception + */ + public void startConsume() throws Exception{ + try { + if(!Config.FILE_CONFIG.isEnabled()&&!Config.JDBC_CONFIG.isEnabled()&&!Config.Kafka_CONFIG.isEnabled()){ + LOG.info("文件也不写,DB也不入,kafka也不写,你拉数据下来干啥....程序退出!!"); + System.exit(-1); + } + exportDataTask(); + + //下面这里判断是否需要定时 + if(Config.QUARTZ_CONFIG.isEnabled()){ + + }; + + + } catch (Exception e) { + throw e; + } + } + /** + * 导出数据线程 + * @throws Exception + */ + private static void exportDataTask() throws Exception{ + try { + nThreads = getRunThreadSize(); + //写文件的线程,单线程操作 + LOG.info("ThreadPool Size:"+nThreads); + String scrollId; + //执行任务 + for(int i=0;i list = sJob.executeJob(getScrollQuery(i,nThreads)); + scrollId = sJob.getSrcollId(); + ExportDataMasterTask task = new ExportDataMasterTask(scrollId,list); + Pool.EXECPool.execute(task); + } + //开启监控线程池线程 + monitor(); + } catch (Exception e) { + //关闭线程池 + if(Pool.EXECPool!=null){ + Pool.EXECPool.shutdown(); + } + throw e; + } + } + /** + * 监控线程池(只有在拉取数据的线程结束后才开启这个监控线程) + * @throws InterruptedException + * @throws ExecutionException + */ + public static void monitor() throws InterruptedException, ExecutionException{ + MonitorTask monitorTask = new MonitorTask(); + //1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。 + FutureTask result = new FutureTask<>(monitorTask); + new Thread(result).start(); + Byte status = result.get(); + if(status==Constant.SUCCESS){ + LOG.info("Process Exits..."); + System.exit(-1); + } + } + + public static String getScrollQuery(Integer nowid,Integer maxid) { + + String query = Config.ES_CONFIG.getQuery(); + String includes =Config.ES_CONFIG.getIncludes(); + JSONObject params = new JSONObject(); + if(StringUtils.isNoneEmpty(query)){ + params = JSON.parseObject(query); + } + if(nowid!=null&&maxid!=null&&maxid>1){ +// if(maxid<=1){ +// throw new IllegalArgumentException("max must be greater than 1"); +// } + if(maxid<=nowid){ + throw new IllegalArgumentException("max must be greater than id"); + } + JSONObject slice = new JSONObject(); + slice.put("id",nowid); + slice.put("max",maxid); + params.put("slice", slice); + } + if(StringUtils.isNoneEmpty(includes)){ + JSONObject inc = new JSONObject(); + String field[] = includes.split(","); + inc.put("includes", field); + params.put("_source",inc); + } + if(StringUtils.isBlank(params.getString("sort"))){ + String sort[] ={"_doc"}; + params.put("sort", sort); + } + return params.toJSONString(); + } + public static int getRunThreadSize() throws Exception{ + EsInfoJob esInfo = new EsInfoJob(); + //索引分片数 + int share = esInfo.getIndexShards(Config.ES_CONFIG.getIndex());//优先级2 + //配置最大线程 + int threadSize = nThreads; //优先级1 + //当前机器CPU数 + int nowCpu = Runtime.getRuntime().availableProcessors(); //优先级3 + //如果分区数小于最大线程数,则线程数取分区的数量 + if(share list) throws Exception; +} diff --git a/src/main/java/com/chenhj/service/IWriteKafkaService.java b/src/main/java/com/chenhj/service/IWriteKafkaService.java new file mode 100644 index 0000000..58831f6 --- /dev/null +++ b/src/main/java/com/chenhj/service/IWriteKafkaService.java @@ -0,0 +1,28 @@ +/** + * + */ +package com.chenhj.service; + +import java.util.List; + +import com.alibaba.fastjson.JSONObject; + +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: IWriteKafkaService.java +* @Description: 该类的功能描述 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年11月17日 下午5:11:43 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年11月17日 chenhj v1.0.0 修改原因 +*/ +public interface IWriteKafkaService { + + public void write2Kafka(List list) throws Exception; +} diff --git a/src/main/java/com/chenhj/service/impl/EsActionServiceImpl.java b/src/main/java/com/chenhj/service/impl/EsActionServiceImpl.java index 208df33..a4f1112 100644 --- a/src/main/java/com/chenhj/service/impl/EsActionServiceImpl.java +++ b/src/main/java/com/chenhj/service/impl/EsActionServiceImpl.java @@ -22,9 +22,9 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; -import com.chenhj.constant.ApplicationConfig; +import com.chenhj.config.Config; import com.chenhj.constant.Constant; -import com.chenhj.init.Rest; +import com.chenhj.es.Rest; import com.chenhj.service.IEsActionService; /** @@ -53,8 +53,8 @@ public class EsActionServiceImpl implements IEsActionService{ private String endPoint2 ; private JSONObject params; public EsActionServiceImpl() throws Exception { - index = ApplicationConfig.getIndex(); - type = ApplicationConfig.getType(); + index = Config.ES_CONFIG.getIndex(); + type = Config.ES_CONFIG.getDocument_type(); this.endPoint1 = index+"/_search?scroll=1h"; if(StringUtils.isNotBlank(type)){ this.endPoint1 = index+"/"+type+"/_search?scroll=1h"; diff --git a/src/main/java/com/chenhj/service/impl/WriteData2File.java b/src/main/java/com/chenhj/service/impl/WriteData2File.java index 937a2c3..3363db4 100644 --- a/src/main/java/com/chenhj/service/impl/WriteData2File.java +++ b/src/main/java/com/chenhj/service/impl/WriteData2File.java @@ -13,9 +13,10 @@ import org.apache.commons.lang3.StringUtils; import com.alibaba.fastjson.JSONObject; -import com.chenhj.constant.ApplicationConfig; +import com.chenhj.config.Config; import com.chenhj.constant.Constant; import com.chenhj.util.FileUtil; +import com.chenhj.util.SqlParser; /** * Copyright: Copyright (c) 2018 Montnets @@ -33,12 +34,21 @@ * 2018年12月11日 chenhj v1.0.0 修改原因 */ public class WriteData2File { - static String customFieldName= ApplicationConfig.getCustomFieldName(); - static boolean isLineFeed = ApplicationConfig.isLineFeed(); - static String fieldSplit= ApplicationConfig.getFieldSplit(); - static String fieldSort = ApplicationConfig.getFieldSort(); - static boolean needFieldName =ApplicationConfig.isNeedFieldName(); - + private String customFieldName; + private boolean isLineFeed; + private String fieldSplit; + private String fieldSort; + private boolean needFieldName; + private String sql_format; + + public WriteData2File(){ + customFieldName= Config.FILE_CONFIG.getCustom_field_name(); + isLineFeed = Config.FILE_CONFIG.getLinefeed(); + fieldSplit= Config.FILE_CONFIG.getField_split(); + fieldSort = Config.FILE_CONFIG.getField_sort(); + needFieldName =Config.FILE_CONFIG.getNeed_field_name(); + sql_format = Config.FILE_CONFIG.getSql_format(); + } /** *写文件工具类 * @param list 数据list @@ -46,33 +56,30 @@ public class WriteData2File { * @param fileType 文件类型 * @throws IOException */ - public static void toWrite(List list,String filePath,String fileType) throws IOException{ - //判断文件和行数 - //boolean flag = FileUtil.isExist(filePath); + public void toWrite(List list,String filePath,String fileType) throws IOException{ //获取数据字符串集合 String str =getJsonStr(list,fileType); - FileUtil.writeFile(filePath,str); } - private static String getJsonStr(List dataList,String fileType) { + private String getJsonStr(List dataList,String fileType) { StringBuilder sb = new StringBuilder(); try { for (JSONObject data : dataList) { switch (fileType) { case Constant.JSON: sb.append(data.toJSONString()); - if(isLineFeed)sb.append("\r\n"); + if(isLineFeed){ + sb.append("\r\n"); + } break; case Constant.TXT: sb.append(txtHandler(data,fieldSplit)); - if(isLineFeed)sb.append("\r\n"); - break; - case Constant.EXCEL: + if(isLineFeed){ + sb.append("\r\n"); + } break; case Constant.SQL: - break; - case Constant.CSV: - csvHandler(data); + sb.append(sqlHandler(data)); sb.append("\r\n"); break; default: @@ -84,17 +91,11 @@ private static String getJsonStr(List dataList,String fileType) { } return sb.toString(); } - private static String excelHandler(JSONObject json){ - - return ""; - } - private static String sqlHandler(){ - return ""; + private String sqlHandler(JSONObject json){ + String sql = SqlParser.replaceToValue(sql_format, json); + return sql; } - private static String csvHandler(JSONObject json){ - return txtHandler(json,Constant.COMMA_SIGN); - } - private static String txtHandler(JSONObject json,String split){ + private String txtHandler(JSONObject json,String split){ List list = new ArrayList<>(); List> listMap = fieldSort(json); @@ -129,7 +130,7 @@ private static String txtHandler(JSONObject json,String split){ } return StringUtils.join(list,split); } - private static List> fieldSort(JSONObject json){ + private List> fieldSort(JSONObject json){ List> listMap = new ArrayList<>(); //字段有序读出 if(StringUtils.isNoneEmpty(fieldSort)){ @@ -148,7 +149,7 @@ private static List> fieldSort(JSONObject json){ * @param oldkey * @return */ - private static String replaceKey(String oldkey){ + private String replaceKey(String oldkey){ String keySet[] = customFieldName.split(","); for(String key:keySet){ String keys[] = key.split(":"); @@ -159,31 +160,4 @@ private static String replaceKey(String oldkey){ } return oldkey; } - /** - * 验证链接后缀名 - * @param filePath - * @param fileType - * @return - */ - public static String validationFileName(String fileName,String fileType){ - String split = "."; - String name[] = fileName.split(split); - switch (fileType) { - case Constant.JSON: - - break; - case Constant.TXT: - break; - case Constant.EXCEL: - break; - case Constant.SQL: - break; - case Constant.CSV: - - break; - default: - break; - } - return fileName; - } } diff --git a/src/main/java/com/chenhj/service/impl/WriteDbServiceImpl.java b/src/main/java/com/chenhj/service/impl/WriteDbServiceImpl.java new file mode 100644 index 0000000..b8d7d58 --- /dev/null +++ b/src/main/java/com/chenhj/service/impl/WriteDbServiceImpl.java @@ -0,0 +1,40 @@ +/** + * + */ +package com.chenhj.service.impl; +import java.util.List; +import com.alibaba.fastjson.JSONObject; +import com.chenhj.dao.DbDao; +import com.chenhj.dao.impl.DbDaoImpl; +import com.chenhj.service.IWriteDbService; +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: DataToFileServiceImpl.java +* @Description: 该类的功能描述 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年11月17日 下午5:13:01 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年11月17日 chenhj v1.0.0 修改原因 +*/ +public class WriteDbServiceImpl implements IWriteDbService{ + private DbDao dao; + + public WriteDbServiceImpl() { + dao = new DbDaoImpl(); + } + @Override + public void write2Db(List list) throws Exception { + try { + dao.insert(list); + } catch (Exception e) { + throw e; + } + } + +} diff --git a/src/main/java/com/chenhj/service/impl/WriteFileServiceImpl.java b/src/main/java/com/chenhj/service/impl/WriteFileServiceImpl.java index a8698cc..7b054b5 100644 --- a/src/main/java/com/chenhj/service/impl/WriteFileServiceImpl.java +++ b/src/main/java/com/chenhj/service/impl/WriteFileServiceImpl.java @@ -4,15 +4,12 @@ package com.chenhj.service.impl; import java.io.File; -import java.io.FileNotFoundException; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.StringUtils; - import com.alibaba.fastjson.JSONObject; -import com.chenhj.constant.ApplicationConfig; +import com.chenhj.config.Config; import com.chenhj.constant.Constant; import com.chenhj.service.IWriteFileService; import com.chenhj.util.EncryUtil; @@ -35,103 +32,129 @@ */ public class WriteFileServiceImpl implements IWriteFileService{ // 分文件导出 - private static Map jsonIndex = new HashMap(); - private static boolean firstRun = true; - private static int index = 0; // 起始文件下标 - String basePath = ApplicationConfig.getFilePath(); - String fileName = ApplicationConfig.getFileName(); - String fileSize = ApplicationConfig.getFileSize(); - - String flagFileName = ".es_data_export"; - String query = ApplicationConfig.getQuery(); - String dataLayout= ApplicationConfig.getDataLayout(); - + private volatile static Integer index=null; // 起始文件下标 + // 分文件导出 + private static volatile Map jsonIndex = new HashMap(); + private String basePath; + private String fileName; + private String max_length_file; + private String query; + private String dataLayout; + private String split_method; + private boolean need_split_file; + private WriteData2File writeData2File; + public WriteFileServiceImpl() throws Exception{ + this.writeData2File = new WriteData2File(); + this.basePath = Config.FILE_CONFIG.getFilepath(); + this.fileName = Config.FILE_CONFIG.getFilename(); + this.max_length_file = Config.FILE_CONFIG.getMax_length_file(); + this.query = Config.ES_CONFIG.getQuery(); + this.dataLayout= Config.FILE_CONFIG.getDatalayout(); + this.query = EncryUtil.encry(query, "MD5"); + this.need_split_file = Config.FILE_CONFIG.getNeed_split_file(); + this.split_method = Config.FILE_CONFIG.getSplit_method(); + } @Override - public void write2File(List list) throws Exception { + public synchronized void write2File(List list) throws Exception { try { String filePath = ""; /*******************此处选出标记的文件*************************/ //判断是否需要分割 - if(StringUtils.isNoneEmpty(fileSize)){ - int dataSize = list.size(); - filePath = splitFile(dataSize); - }else{ - filePath = basePath +File.separator+fileName; + if(need_split_file){ + switch (split_method) { + case "disk": + fileName = splitFile(); + break; + case "amount": + int size = list.size(); + fileName =fileName+"_"+sedAndGetIndex(size); + break; + default: + break; + } } + fileName = parserFileName(fileName, dataLayout); + filePath = basePath +File.separator+fileName; /*********************************************/ - WriteData2File.toWrite(list, filePath,dataLayout); + writeData2File.toWrite(list, filePath,dataLayout); } catch (Exception e) { throw e; } } - /** - * 文件切割算法 - * @throws Exception - */ - public String splitFile(int dataSize) throws Exception{ - String filePath = ""; - String flagStr = ""; - String flagFilePath = basePath +File.separator+flagFileName; - int num = Integer.valueOf(fileSize); - query = EncryUtil.encry(query, "MD5"); - if(firstRun){ - try { - String flag = FileUtil.fileRead(flagFilePath); - //查看是否是第一批数据 - if(StringUtils.isNoneEmpty(flag)){ - String flags[] = flag.split(","); - index = Integer.valueOf(flags[0]); - // String queryFlag = flags[2]; - //判断再次启动查询条件有没有修改,如果已经修改,则从头写起 - // if(query.equals(queryFlag)){ - Integer count = Integer.valueOf(flags[1].trim()); - if (count >= num) { - jsonIndex.put(++index, dataSize); - } else { - jsonIndex.put(index, count + dataSize); - } - // }else{ - // index = 0; - // jsonIndex.put(0, dataSize); - // } - }else{ - index = 0; - jsonIndex.put(0, dataSize); - } - } catch (FileNotFoundException e) { - index = 0; - jsonIndex.put(0, dataSize); - } - firstRun = false; - }else if(!firstRun){ - index = sedAndGetIndex(dataSize,num); - } - filePath = basePath +File.separator+fileName+"_"+index; - flagStr = index+Constant.COMMA_SIGN+jsonIndex.get(index)+Constant.COMMA_SIGN+query; - //将标记写入日志中,该方法用于多文件切割时候用到 - FileUtil.clearInfoForFile(flagFilePath); - FileUtil.writeFile(flagFilePath,flagStr); - return filePath; - } - /** - * 获取要写入的文件下标 - * @param size 数据长度 - * @param num 单个文件最大长度 - * @return - */ - public int sedAndGetIndex(int size,int num) { - + //以文件条数分割算法 + // 获取要写入的文件下标 + public int sedAndGetIndex(int size) { if (jsonIndex.size() == 0) { jsonIndex.put(0, size); return 0; } + if(index==null){ + index = 0; + } int count = jsonIndex.get(index); - if (count >= num) { + + if (count >= Integer.valueOf(max_length_file)) { jsonIndex.put(++index, size); } else { jsonIndex.put(index, count + size); } return index; } - + /** + * 以文件大小进行文件切割算法 + * @throws Exception + */ + public String splitFile() throws Exception{ + String fileName = ""; + //String flagStr = ""; + //KB转B + long max_size = Long.valueOf(max_length_file)*1024; + //String flagFilePath = basePath +File.separator+flagFileName; + long fileSize = FileUtil.getFileSize(""); + //3,114,3f5ea8e4e6cfb52f90310413623f25f9 + // String flag = FileUtil.fileRead(flagFilePath); + //查看是否是第一批数据 + if(index!=null){ + //String flags[] = flag.split(","); + //index = Integer.valueOf(flags[0]); + String filePath = basePath +File.separator+this.fileName+"_"+index; + //获取当前文件大小 + fileSize = FileUtil.getFileSize(filePath); + if (fileSize >= max_size) { + ++index; + }else if(fileSize == -1){ + index =0; + } + }else{ + index =0; + } + fileName = this.fileName+"_"+index; + //flagStr = index+Constant.COMMA_SIGN+query; + //将标记写入日志中,该方法用于多文件切割时候用到 + //FileUtil.clearInfoForFile(flagFilePath); + //FileUtil.writeFile(flagFilePath,flagStr); + return fileName; + } + private String parserFileName(String fileName,String fileType) { + try { + switch (fileType) { + case Constant.SQL: + fileName= fileName+".sql"; + break; + case Constant.CSV: + fileName= fileName+".csv"; + break; + default: + break; + } + } catch (Exception e) { + throw (e); + } + return fileName; + } + public static void main(String[] args) { + int i = 0; + i++; + System.out.println(i); + } } diff --git a/src/main/java/com/chenhj/service/impl/WriteKafkaServiceImpl.java b/src/main/java/com/chenhj/service/impl/WriteKafkaServiceImpl.java new file mode 100644 index 0000000..1307812 --- /dev/null +++ b/src/main/java/com/chenhj/service/impl/WriteKafkaServiceImpl.java @@ -0,0 +1,113 @@ +/** + * + */ +package com.chenhj.service.impl; +import java.util.List; +import java.util.Map; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.chenhj.config.Config; +import com.chenhj.constant.Constant; +import com.chenhj.service.IWriteKafkaService; +import com.chenhj.util.MyTool; +import com.chenhj.util.kafka.KafkaUtil; +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: WriteKafkaServiceImpl.java +* @Description: 该类的功能描述 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年11月17日 下午5:13:01 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年11月17日 chenhj v1.0.0 修改原因 +*/ +public class WriteKafkaServiceImpl implements IWriteKafkaService{ + private int send_size; + private String topic; + private int delay; + private JSONObject add_value_JSON; + private Map replace_key_Map; + + public WriteKafkaServiceImpl() { + this.send_size = Config.Kafka_CONFIG.getSend_size(); + this.topic = Config.Kafka_CONFIG.getTopic(); + this.delay = Config.Kafka_CONFIG.getDelay(); + this.add_value_JSON = Config.Kafka_CONFIG.getAdd_value_JSON(); + this.replace_key_Map = Config.Kafka_CONFIG.getReplace_key_Map(); + } + + @Override + public void write2Kafka(List list) throws Exception { + try { + //是否需要处理数据 + if(add_value_JSON!=null){ + addValue(list); + } + if(replace_key_Map!=null){ + replaceKey(list); + } + int size = list.size(); + /*******************数据分批次写入kafka*********************/ + int toIndex=send_size; + long startTime = System.currentTimeMillis(); + List newList = null; + //分批数据,单批100条 + for(int i=0;i<=size;i+=send_size){ + if(i+send_size>size){ + toIndex=size-i; //作用为toIndex最后没有100条数据则剩余几条newList中就装几条 + } + newList = list.subList(i,i+toIndex); + String msg = JSON.toJSONString(newList); + KafkaUtil.sendMessage(msg,topic); + //查看是否启用延迟写入,启用的话在这里造成阻塞 + if(delay>0){ + long endTime = System.currentTimeMillis(); + long differ = endTime-startTime; + //查看是否达到延迟时间,未到达则延迟 + if(differ list){ + + for(String str:add_value_JSON.keySet()){ + String value = add_value_JSON.get(str)+""; + if(value.startsWith("#{now")){ + String ss[] = MyTool.getConfigParent(value).split(Constant.COMMA_SIGN); + if(ss.length==1){ + add_value_JSON.put(str,System.currentTimeMillis()/1000); + }else if(ss.length==2){ + add_value_JSON.put(str, MyTool.getNowTime(ss[1])); + } + } + } + for(JSONObject json:list){ + json.putAll(add_value_JSON); + } + } + private void replaceKey(List list){ + for(JSONObject json:list){ + for (Map.Entry m : replace_key_Map.entrySet()) { + String oldKey = m.getKey(); + String newKey = m.getValue(); + Object value = json.get(oldKey); + //不为null说明有值 + if(value!=null){ + json.remove(oldKey); + json.put(newKey, value); + } + } + } + } + +} diff --git a/src/main/java/com/chenhj/task/ExportDataTask.java b/src/main/java/com/chenhj/task/ExportDataMasterTask.java similarity index 65% rename from src/main/java/com/chenhj/task/ExportDataTask.java rename to src/main/java/com/chenhj/task/ExportDataMasterTask.java index 0569a25..d611da1 100644 --- a/src/main/java/com/chenhj/task/ExportDataTask.java +++ b/src/main/java/com/chenhj/task/ExportDataMasterTask.java @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject; +import com.chenhj.config.Config; import com.chenhj.constant.Pool; import com.chenhj.service.IEsActionService; import com.chenhj.service.impl.EsActionServiceImpl; @@ -27,21 +28,28 @@ *---------------------------------------------------------* * 2018年8月6日 chenhj v1.0.0 修改原因 */ -public class ExportDataTask implements Runnable{ - private static final Logger logger = LoggerFactory.getLogger(ExportDataTask.class); +public class ExportDataMasterTask implements Runnable{ + private static final Logger logger = LoggerFactory.getLogger(ExportDataMasterTask.class); private IEsActionService esActionService; private String srcollId; private List list = null; + //启用标志 + private boolean jdbcEnabled; + private boolean fileEnabled; + private boolean kafkaEnabled; /** * @param scroll_id * @param list * @throws Exception */ - public ExportDataTask(String scroll_id,List list) throws Exception{ + public ExportDataMasterTask(String scroll_id,List list) throws Exception{ esActionService = new EsActionServiceImpl(); this.srcollId = scroll_id; this.list= list; + this.jdbcEnabled=Config.JDBC_CONFIG.isEnabled(); + this.fileEnabled =Config.FILE_CONFIG.isEnabled(); + this.kafkaEnabled = Config.Kafka_CONFIG.isEnabled(); } @Override public void run() { @@ -54,7 +62,18 @@ public void run() { try { if(list!=null&&!list.isEmpty()){ count = count+list.size(); - Pool.WRITE_FILE_POOL.addExecuteTask(new Write2FileTask(list)); + //写文件 + if(fileEnabled){ + Pool.WRITE_FILE_POOL.addExecuteTask(new Write2FileTask(list)); + } + //写DB + if(jdbcEnabled){ + Pool.WRITE_DB_POOL.addExecuteTask(new Write2DbTask(list)); + } + //写kafka + if(kafkaEnabled){ + Pool.WRITE_KAFKA_POOL.addExecuteTask(new Write2KafkaTask(list)); + } }else{ esActionService.clearSrcoll(srcollId); logger.info(Thread.currentThread().getName()+"线程拉取完成.数据条数:"+count); diff --git a/src/main/java/com/chenhj/task/MonitorTask.java b/src/main/java/com/chenhj/task/MonitorTask.java index d3b01d5..ee11bc8 100644 --- a/src/main/java/com/chenhj/task/MonitorTask.java +++ b/src/main/java/com/chenhj/task/MonitorTask.java @@ -12,7 +12,7 @@ import com.chenhj.constant.Constant; import com.chenhj.constant.Pool; -import com.chenhj.init.Rest; +import com.chenhj.es.Rest; import com.chenhj.thread.ThreadPoolManager; /** @@ -32,26 +32,53 @@ */ public class MonitorTask implements Callable{ private static final Logger logger = LoggerFactory.getLogger("Monitor"); + private static final Logger log = LoggerFactory.getLogger(MonitorTask.class); private ThreadPoolExecutor tpe = ((ThreadPoolExecutor) Pool.EXECPool); private ThreadPoolManager filePool = Pool.WRITE_FILE_POOL; + private ThreadPoolManager DbPool = Pool.WRITE_DB_POOL; + private ThreadPoolManager KafkaPool = Pool.WRITE_KAFKA_POOL; @Override public Byte call() throws Exception { //监控拉取数据的线程池 while (true) { boolean flag = getDataPool(); if(flag){ + log.info("ES is finished scroll, Stop the scroll thread pool..."); break; } } //关闭ES连接 Rest.Client.getRestClient().close(); //监控写文件线程是否已经结束 - while (true) { - boolean flag = writeFilePool(); - if(flag){ - break; - } - } + if(filePool!=null){ + while (true) { + boolean flag = writeFilePool(); + if(flag){ + log.info("File is finished writing, Stop the writing thread pool..."); + break; + } + } + } + //监控写DB线程是否已经结束 + if(DbPool!=null){ + while (true) { + boolean flag = writeDbPool(); + if(flag){ + log.info("DB is finished writing, Stop the writing thread pool..."); + break; + } + } + } + //监控写kafka线程是否已经结束 + if(KafkaPool!=null){ + while (true) { + boolean flag = writeKafkaPool(); + if(flag){ + log.info("Kafka is finished writing, Stop the writing thread pool..."); + break; + } + } + } return Constant.SUCCESS; } public boolean getDataPool() throws InterruptedException{ @@ -88,4 +115,36 @@ public boolean writeFilePool() throws InterruptedException{ TimeUnit.SECONDS.sleep(3); return flag; } + public boolean writeDbPool() throws InterruptedException{ + boolean flag = false; + boolean hasMoreAcquire = DbPool.hasMoreAcquire(); + boolean isTaskEnd = DbPool.isTaskEnd(); + int queueSize = DbPool.getNumQueue(); + logger.info("WRITE_Db_POOL>>Queue:" + queueSize); + int activeCount = DbPool.getNumActive(); + logger.info("WRITE_Db_POOL>>Active:" + activeCount); + if(!hasMoreAcquire&&isTaskEnd&&activeCount==0){ + DbPool.shutdown(); + logger.info(">>>>>WRITE_Db_POOL Shutdown..."); + flag = true; + } + TimeUnit.SECONDS.sleep(3); + return flag; + } + public boolean writeKafkaPool() throws InterruptedException{ + boolean flag = false; + boolean hasMoreAcquire = KafkaPool.hasMoreAcquire(); + boolean isTaskEnd = KafkaPool.isTaskEnd(); + int queueSize = KafkaPool.getNumQueue(); + logger.info("WRITE_Kafka_POOL>>Queue:" + queueSize); + int activeCount = KafkaPool.getNumActive(); + logger.info("WRITE_Kafka_POOL>>Active:" + activeCount); + if(!hasMoreAcquire&&isTaskEnd&&activeCount==0){ + KafkaPool.shutdown(); + logger.info(">>>>>WRITE_Kafka_POOL Shutdown..."); + flag = true; + } + TimeUnit.SECONDS.sleep(3); + return flag; + } } diff --git a/src/main/java/com/chenhj/task/Write2DbTask.java b/src/main/java/com/chenhj/task/Write2DbTask.java new file mode 100644 index 0000000..861b667 --- /dev/null +++ b/src/main/java/com/chenhj/task/Write2DbTask.java @@ -0,0 +1,46 @@ +/** + * + */ +package com.chenhj.task; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.fastjson.JSONObject; +import com.chenhj.dao.DbDao; +import com.chenhj.dao.impl.DbDaoImpl; +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: Write2FileTask.java +* @Description: 写DB任务类 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月6日 上午10:36:56 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月6日 chenhj v1.0.0 修改原因 +*/ +public class Write2DbTask implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(Write2DbTask.class); + private DbDao dbDao; + private List list = null; + public Write2DbTask(List list) { + this.list = list; + dbDao = new DbDaoImpl(); + } + @Override + public void run() { + try { + dbDao.insert(list); + return; + } catch (Exception e) { + logger.error("Write DB fail:",e); + } + } +} diff --git a/src/main/java/com/chenhj/task/Write2FileTask.java b/src/main/java/com/chenhj/task/Write2FileTask.java index e923780..331e2b8 100644 --- a/src/main/java/com/chenhj/task/Write2FileTask.java +++ b/src/main/java/com/chenhj/task/Write2FileTask.java @@ -16,7 +16,7 @@ * Copyright: Copyright (c) 2018 Montnets * * @ClassName: Write2FileTask.java -* @Description: 写数据任务类 +* @Description: 写文件任务类 * * @version: v1.0.0 * @author: chenhj @@ -31,7 +31,7 @@ public class Write2FileTask implements Runnable { private static final Logger logger = LoggerFactory.getLogger(Write2FileTask.class); private IWriteFileService dataToFileService; private List list = null; - public Write2FileTask(List list) { + public Write2FileTask(List list) throws Exception { this.list = list; dataToFileService = new WriteFileServiceImpl(); } diff --git a/src/main/java/com/chenhj/task/Write2KafkaTask.java b/src/main/java/com/chenhj/task/Write2KafkaTask.java new file mode 100644 index 0000000..48131e1 --- /dev/null +++ b/src/main/java/com/chenhj/task/Write2KafkaTask.java @@ -0,0 +1,46 @@ +/** + * + */ +package com.chenhj.task; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.fastjson.JSONObject; +import com.chenhj.service.IWriteKafkaService; +import com.chenhj.service.impl.WriteKafkaServiceImpl; + +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: Write2FileTask.java +* @Description: 写文件任务类 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月6日 上午10:36:56 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月6日 chenhj v1.0.0 修改原因 +*/ +public class Write2KafkaTask implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(Write2KafkaTask.class); + private List list = null; + private IWriteKafkaService kafkaService; + public Write2KafkaTask(List list) throws Exception { + this.list = list; + kafkaService = new WriteKafkaServiceImpl(); + } + @Override + public void run() { + try { + kafkaService.write2Kafka(list); + } catch (Exception e) { + logger.error("Write Kafka fail:",e); + } + } +} diff --git a/src/main/java/com/chenhj/thread/ThreadPoolManager.java b/src/main/java/com/chenhj/thread/ThreadPoolManager.java index 288d4f9..f7bd4b6 100644 --- a/src/main/java/com/chenhj/thread/ThreadPoolManager.java +++ b/src/main/java/com/chenhj/thread/ThreadPoolManager.java @@ -38,9 +38,9 @@ public class ThreadPoolManager { /**线程池维护线程所允许的空闲时间,超时时间为0,线程运行完后就关闭,而不会再等待超时时间 单位:秒**/ private static final long TIME_KEEP_ALIVE = 0L; /**线程池所使用的缓冲队列大小**/ - private static final int SIZE_WORK_QUEUE = 30; + private static final int SIZE_WORK_QUEUE = 1; /**任务调度周期**/ - private static final int PERIOD_TASK_QOS = 1000; + private static final int PERIOD_TASK_QOS = 2000; /**任务缓冲队列**/ public static BlockingQueue mTaskQueue = null; /** @@ -90,14 +90,14 @@ public ThreadPoolManager build() { /**通过调度线程周期性的执行缓冲队列中任务*/ ScheduledFuture mTaskHandler = null; /**线程池*/ - private ThreadPoolExecutor mThreadPool = null; + private MyThreadPool mThreadPool = null; private ThreadPoolManager build(ThreadPoolManager builder) { namedThreadFactory = new CustomThreadFactory(); scheduler = new ScheduledThreadPoolExecutor(SIZE_CORE_POOL,namedThreadFactory,mHandler); mTaskHandler = scheduler.scheduleAtFixedRate(mAccessBufferThread, 0, PERIOD_TASK_QOS,TimeUnit.MILLISECONDS); - mThreadPool =new ThreadPoolExecutor(SIZE_CORE_POOL, SIZE_MAX_POOL, + mThreadPool =new MyThreadPool(SIZE_CORE_POOL, SIZE_MAX_POOL, TIME_KEEP_ALIVE,TimeUnit.SECONDS,mTaskQueue,namedThreadFactory,mHandler); return this; } @@ -116,6 +116,8 @@ public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) { // 核心改造点,由blockingqueue的offer改成put阻塞方法 // mTaskQueue.offer(task); } + + }; /** * 将缓冲队列中的任务重新加载到线程池 @@ -127,6 +129,7 @@ public void run() { mThreadPool.execute(mTaskQueue.poll()); } } + }; @@ -209,9 +212,42 @@ private class CustomThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); - String threadName = "write2file" + count.addAndGet(1); + String threadName = "es_data_export" + count.addAndGet(1); t.setName(threadName); return t; } } + /** + * @Description: 自定义线程名 + */ + private class MyThreadPool extends ThreadPoolExecutor { + + /** + * @param corePoolSize + * @param maximumPoolSize + * @param keepAliveTime + * @param unit + * @param workQueue + * @param threadFactory + * @param handler + */ + public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + //任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间 + @Override + public void beforeExecute(Thread t, Runnable r){ + + } + @Override + public void afterExecute(Runnable r, Throwable t) { + + } + @Override + public void terminated(){ + + } + + } } diff --git a/src/main/java/com/chenhj/util/ByteSizeUnit.java b/src/main/java/com/chenhj/util/ByteSizeUnit.java new file mode 100644 index 0000000..6b982a1 --- /dev/null +++ b/src/main/java/com/chenhj/util/ByteSizeUnit.java @@ -0,0 +1,262 @@ +package com.chenhj.util; + + +public enum ByteSizeUnit{ + BYTES { + @Override + public long toBytes(long size) { + return size; + } + + @Override + public long toKB(long size) { + return size / (C1 / C0); + } + + @Override + public long toMB(long size) { + return size / (C2 / C0); + } + + @Override + public long toGB(long size) { + return size / (C3 / C0); + } + + @Override + public long toTB(long size) { + return size / (C4 / C0); + } + + @Override + public long toPB(long size) { + return size / (C5 / C0); + } + + @Override + public String getSuffix() { + return "b"; + } + }, + KB { + @Override + public long toBytes(long size) { + return x(size, C1 / C0, MAX / (C1 / C0)); + } + + @Override + public long toKB(long size) { + return size; + } + + @Override + public long toMB(long size) { + return size / (C2 / C1); + } + + @Override + public long toGB(long size) { + return size / (C3 / C1); + } + + @Override + public long toTB(long size) { + return size / (C4 / C1); + } + + @Override + public long toPB(long size) { + return size / (C5 / C1); + } + + @Override + public String getSuffix() { + return "kb"; + } + }, + MB { + @Override + public long toBytes(long size) { + return x(size, C2 / C0, MAX / (C2 / C0)); + } + + @Override + public long toKB(long size) { + return x(size, C2 / C1, MAX / (C2 / C1)); + } + + @Override + public long toMB(long size) { + return size; + } + + @Override + public long toGB(long size) { + return size / (C3 / C2); + } + + @Override + public long toTB(long size) { + return size / (C4 / C2); + } + + @Override + public long toPB(long size) { + return size / (C5 / C2); + } + + @Override + public String getSuffix() { + return "mb"; + } + }, + GB { + @Override + public long toBytes(long size) { + return x(size, C3 / C0, MAX / (C3 / C0)); + } + + @Override + public long toKB(long size) { + return x(size, C3 / C1, MAX / (C3 / C1)); + } + + @Override + public long toMB(long size) { + return x(size, C3 / C2, MAX / (C3 / C2)); + } + + @Override + public long toGB(long size) { + return size; + } + + @Override + public long toTB(long size) { + return size / (C4 / C3); + } + + @Override + public long toPB(long size) { + return size / (C5 / C3); + } + + @Override + public String getSuffix() { + return "gb"; + } + }, + TB { + @Override + public long toBytes(long size) { + return x(size, C4 / C0, MAX / (C4 / C0)); + } + + @Override + public long toKB(long size) { + return x(size, C4 / C1, MAX / (C4 / C1)); + } + + @Override + public long toMB(long size) { + return x(size, C4 / C2, MAX / (C4 / C2)); + } + + @Override + public long toGB(long size) { + return x(size, C4 / C3, MAX / (C4 / C3)); + } + + @Override + public long toTB(long size) { + return size; + } + + @Override + public long toPB(long size) { + return size / (C5 / C4); + } + + @Override + public String getSuffix() { + return "tb"; + } + }, + PB { + @Override + public long toBytes(long size) { + return x(size, C5 / C0, MAX / (C5 / C0)); + } + + @Override + public long toKB(long size) { + return x(size, C5 / C1, MAX / (C5 / C1)); + } + + @Override + public long toMB(long size) { + return x(size, C5 / C2, MAX / (C5 / C2)); + } + + @Override + public long toGB(long size) { + return x(size, C5 / C3, MAX / (C5 / C3)); + } + + @Override + public long toTB(long size) { + return x(size, C5 / C4, MAX / (C5 / C4)); + } + + @Override + public long toPB(long size) { + return size; + } + + @Override + public String getSuffix() { + return "pb"; + } + }; + + static final long C0 = 1L; + static final long C1 = C0 * 1024L; + static final long C2 = C1 * 1024L; + static final long C3 = C2 * 1024L; + static final long C4 = C3 * 1024L; + static final long C5 = C4 * 1024L; + + static final long MAX = Long.MAX_VALUE; + + public static ByteSizeUnit fromId(int id) { + if (id < 0 || id >= values().length) { + throw new IllegalArgumentException("No byte size unit found for id [" + id + "]"); + } + return values()[id]; + } + + /** + * Scale d by m, checking for overflow. + * This has a short name to make above code more readable. + */ + static long x(long d, long m, long over) { + if (d > over) return Long.MAX_VALUE; + if (d < -over) return Long.MIN_VALUE; + return d * m; + } + + public abstract long toBytes(long size); + + public abstract long toKB(long size); + + public abstract long toMB(long size); + + public abstract long toGB(long size); + + public abstract long toTB(long size); + + public abstract long toPB(long size); + + public abstract String getSuffix(); + +} diff --git a/src/main/java/com/chenhj/util/DriverLoader.java b/src/main/java/com/chenhj/util/DriverLoader.java new file mode 100644 index 0000000..287f547 --- /dev/null +++ b/src/main/java/com/chenhj/util/DriverLoader.java @@ -0,0 +1,73 @@ +/** + * + */ +package com.chenhj.util; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.sql.Driver; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.alibaba.druid.util.JdbcConstants; +/** +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: DriverLoader.java +* @Description: 动态加载驱动jar包 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年12月28日 下午4:21:35 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年12月28日 chenhj v1.0.0 修改原因 +*/ +public class DriverLoader { + private static final Logger logger = LoggerFactory.getLogger(DriverLoader.class); + private static URLClassLoader loader; + private static String driverClass; + /** + * 加载对应路径jar包里的对应驱动 + * @param fname 对应路径 如: lib4/ojdbc14.jar + * @param dname 驱动名 如: oracle.jdbc.driver.OracleDriver + * @return 加载到的驱动 java.sql.Driver + * @throws Exception + * @author tangxr + */ + public static Driver getDriverLoaderByName (String fname,String driver)throws Exception { + setDriverClassName(driver); + if(StringUtils.isBlank(fname)){ + logger.error("对应的驱动路径不存在,请确认."); + return null; + } + if(StringUtils.isBlank(driverClass)){ + logger.error("对应的驱动类的名字不存在."); + return null; + } + File file = new File(fname); + if(!file.exists()){ + logger.error("对应的驱动jar不存在."); + return null; + } + loader = new URLClassLoader(new URL[] { file.toURI().toURL() }); + loader.clearAssertionStatus(); + return (Driver) loader.loadClass(driverClass).newInstance(); + } + public static void setDriverClassName(String driver) { + if (driver != null && driver.length() > 256) { + throw new IllegalArgumentException("driverClassName length > 256."); + } + + if (JdbcConstants.ORACLE_DRIVER2.equalsIgnoreCase(driver)) { + driver = "oracle.jdbc.OracleDriver"; + logger.warn("oracle.jdbc.driver.OracleDriver is deprecated.Having use oracle.jdbc.OracleDriver."); + } + driverClass = driver; + } +} diff --git a/src/main/java/com/chenhj/util/FileUtil.java b/src/main/java/com/chenhj/util/FileUtil.java index 7f33d72..1b25e45 100644 --- a/src/main/java/com/chenhj/util/FileUtil.java +++ b/src/main/java/com/chenhj/util/FileUtil.java @@ -6,10 +6,13 @@ import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.LineNumberReader; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -93,6 +96,37 @@ public static int getFileLine(File file) throws IOException{ } return line; } + /** + * 获取文件长度,单位B + * @param file + * @throws IOException + */ + public static long getFileSize(String filePath) throws Exception { + long fileSize = 0L; + File file = null; + FileChannel fc= null; + FileInputStream fis = null; + try { + file = new File(filePath); + if (file.exists() && file.isFile()){ + fis = new FileInputStream(file); + fc= fis.getChannel(); + fileSize = fc.size(); + }else{ + fileSize = -1L; + } + } catch (FileNotFoundException e) { + throw e; + } finally { + if (null!=fc){ + fc.close(); + } + if(fis!=null){ + fis.close(); + } + } + return fileSize; + } public static String fileRead(String filePath) throws Exception { BufferedReader bReader = null; try { @@ -106,8 +140,8 @@ public static String fileRead(String filePath) throws Exception { } String str = sb.toString(); return str; - } catch (Exception e) { - throw e; + } catch (FileNotFoundException e) { + return null; }finally{ if(bReader!=null){ bReader.close(); diff --git a/src/main/java/com/chenhj/util/MyTool.java b/src/main/java/com/chenhj/util/MyTool.java new file mode 100644 index 0000000..64095ca --- /dev/null +++ b/src/main/java/com/chenhj/util/MyTool.java @@ -0,0 +1,48 @@ +/** + * + */ +package com.chenhj.util; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** +* Copyright: Copyright (c) 2019 Montnets +* +* @ClassName: MyTool.java +* @Description: 该类的功能描述 +* +* @version: v1.0.0 +* @author: chenhj +* @date: 2019年1月16日 下午3:53:27 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2019年1月16日 chenhj v1.0.0 修改原因 +*/ +public class MyTool { + private static Pattern regex = Pattern.compile("\\#\\{([^}]*)\\}"); + /** + * 获取#param{}中的值 + * @param str + */ + public static String getConfigParent(String str){ + Matcher matcher = regex.matcher(str); + String map = null; + while(matcher.find()) { + map=matcher.group(1); + } + return map; + } + /** + * 获取当前String类型的的时间(自定义格式) + * @param format 时间格式 + * @return String + */ + public static String getNowTime(String format) { + return new SimpleDateFormat(format).format(new Date()); + } +} diff --git a/src/main/java/com/chenhj/util/PropertiesAutoSerialize.java b/src/main/java/com/chenhj/util/PropertiesAutoSerialize.java deleted file mode 100644 index be72050..0000000 --- a/src/main/java/com/chenhj/util/PropertiesAutoSerialize.java +++ /dev/null @@ -1,208 +0,0 @@ -/** - * - */ -package com.chenhj.util; - -/** -* Copyright: Copyright (c) 2018 Montnets -* -* @ClassName: PropertiesAutoSerialize.java -* @Description: 该类用于将Properties文件中的参数,自动的设置到类同名字段中,前提是字段为静态的. -* -* @version: v1.0.0 -* @author: chenhj -* @date: 2018年12月5日 下午5:47:14 -* -* Modification History: -* Date Author Version Description -*---------------------------------------------------------* -* 2018年12月5日 chenhj v1.0.0 修改原因 -*/ -import java.io.*; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.Iterator; -import java.util.Properties; - -import org.apache.commons.lang3.StringUtils; - -public class PropertiesAutoSerialize implements Serializable { - /** - *@Fields serialVersionUID : TODO - */ - private static final long serialVersionUID = 1L; - - /** - * 待反序列化的类 - */ - private Class clazz; - - /** - * 待反序列化的Properties文件输入流 - */ - private InputStream propertiesFile; - - /** - * Properties操作对象 - */ - private Properties p = new Properties(); - - /** - * 私有的构造方法 - * 用于获取Properties文件流和设置待转对象 - * - * @param path 用于指定Properties文件路径,例如"/config.properties" - * @param clazz 待反序列化的类 - * @throws FileNotFoundException - */ - private PropertiesAutoSerialize(String path, Class clazz) throws FileNotFoundException { - // this.propertiesFile = PropertiesAutoSerialize.class.getResourceAsStream(path); - //获取resource中的配置 - File file =new File(path); - if(file.exists()){ - this.propertiesFile=new FileInputStream(file); - }else{ - this.propertiesFile=PropertiesAutoSerialize.class.getClassLoader().getResourceAsStream(path); - } - InputStreamReader isr = null; - try { - isr = new InputStreamReader(propertiesFile, "UTF-8"); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } - this.clazz = clazz; - try { - p.load(isr); - } catch (IOException e) { - e.printStackTrace(); - } - } - - /** - * 获取指定key的value值 - * - * @param key 用于指定获取Properties中的key - * @return 用于返回Properties中指定key的value - */ - private String readProperty(String key) { - if (null == key || "".equals(key.trim())) return null; - return p.getProperty(key); - } - - /** - * PropertiesAutoSerialize.init("/config.properties",Const.class); - * - * @param path properties路径名 - * @param clazz 需要反序列化的类 - * @throws FileNotFoundException - */ - public static void init(String path, Class clazz) throws FileNotFoundException { - new PropertiesAutoSerialize(path, clazz).serializeProperties(); - } - - /** - * 转换实现原理: - * 获取Properties中所有的key,并遍历 - * 获取该key对应的value,如果value为空字符串,则跳过 - * 去取待转类中的同名字段,如果没有则跳过 - * 判断这个字段是否是静态字段,如果不是则跳过 - * 判断这个字段是否是final,如果是则跳过 - * 设置该字段为可见 - * 获取Properties中指定的value并trim - * 执行setField方法,对指定的字段进行设置值 - */ - private void serializeProperties() { - try { - Iterator iterator = p.keySet().iterator(); - while (iterator.hasNext()) { - Object obj = iterator.next(); - if (!(obj instanceof String)) { - continue; - } - String key = ((String) obj).trim(); - String value = readProperty(key).trim(); - if (StringUtils.isEmpty(value)) { - continue; - } - Field field; - if (null == (field = getField(key))) { - continue; - } - //不为静态不设置 - if (!Modifier.isStatic(field.getModifiers())) { - continue; - } - if (Modifier.isFinal(field.getModifiers())) { - continue; - } - field.setAccessible(true); - this.setField(field, value); - } - } finally { - try { - if (null != propertiesFile) - propertiesFile.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - - - /** - * 通过反射获取待转类clazz中指定字段名的字段,如果字段不存在则返回null - * - * @param fieldName 去查找待转类中的指定字段 - * @return 返回指定的字段 - */ - private Field getField(String fieldName) { - try { - return clazz.getDeclaredField(fieldName); - } catch (Exception ignored) { - } - return null; - } - - /** - * 对指定的字段进行设置值,目前仅支持字段类型: - * String,boolean,byte,char,short,int,long,float,double - * - * @param field 指定的字段 - * @param value 设置值 - */ - private void setField(Field field, String value) { - Class type = field.getType(); - Object par = null; - try { - if (String.class.equals(field.getType())) { - par = value; - } else if (int.class.equals(type) || Integer.class.equals(type)) { - par = Integer.valueOf(value); - } else if (boolean.class.equals(type) || Boolean.class.equals(type)) { - par = Boolean.valueOf(value); - } else if (long.class.equals(type) || Long.class.equals(type)) { - par = Long.valueOf(value); - } else if (double.class.equals(type) || Double.class.equals(type)) { - par = Double.valueOf(value); - } else if (float.class.equals(type) || Float.class.equals(type)) { - par = Float.valueOf(value); - } else if (short.class.equals(type) || Short.class.equals(type)) { - par = Short.valueOf(value); - } else if (byte.class.equals(type) || Byte.class.equals(type)) { - par = Byte.valueOf(value); - } else if (char.class.equals(type)) { - par = value.charAt(0); - } - if (null != par) { - field.set(null, par); - } else { - throw new RuntimeException("Properties转换异常:Class(字段类型不是'八大基本类型和String'):" + clazz.getName() + ",字段名:" + field.getName() + ",字段类型:" + field.getType() + ",value:" + value); - } - } catch (IllegalAccessException e) { - throw new RuntimeException("Properties转换异常:Class(IllegalAccessException):" + clazz.getName() + ",字段名:" + field.getName() + ",字段类型:" + field.getType() + ",value:" + value); - } catch (NumberFormatException e){ - throw new RuntimeException("Properties转换异常:Class(NumberFormatException):" + clazz.getName() + ",字段名:" + field.getName() + ",字段类型:" + field.getType() + ",value:" + value); - } - } - -} diff --git a/src/main/java/com/chenhj/util/PropertiesUtil.java b/src/main/java/com/chenhj/util/PropertiesUtil.java new file mode 100644 index 0000000..9380178 --- /dev/null +++ b/src/main/java/com/chenhj/util/PropertiesUtil.java @@ -0,0 +1,101 @@ +package com.chenhj.util; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * +* Copyright: Copyright (c) 2018 Montnets +* +* @ClassName: GetProperties.java +* @Description: 该类的功能描述 +*获取配置文件 +* @version: v1.0.0 +* @author: chenhj +* @date: 2018年6月9日 下午1:54:50 +* +* Modification History: +* Date Author Version Description +*---------------------------------------------------------* +* 2018年6月9日 chenhj v1.0.0 修改原因 + */ +public class PropertiesUtil { + private Map appSettings = new HashMap(); + private static final Logger LOG = LoggerFactory.getLogger(PropertiesUtil.class); + private String pathName; + /** + * 初始化系统默认参数 + */ + public PropertiesUtil(String pathName){ + this.pathName=pathName; + } + public PropertiesUtil(){ + } + private void init(){ + InputStream in = null; + try{ + //获取resource中的配置 + File file =new File(pathName); + if(file.exists()){ + in=new FileInputStream(file); + }else{ + in=PropertiesUtil.class.getClassLoader().getResourceAsStream(pathName); + } + //获取项目同级的配置 + // + Properties prop = new Properties(); + prop.load(new InputStreamReader(in, "utf-8")); + Set> buf = prop.entrySet(); + Iterator> it = buf.iterator(); + while(it.hasNext()){ + Entry t = it.next(); + appSettings.put((String)t.getKey(), (String)t.getValue()); + } + + }catch(IOException e){ + LOG.error("加载配置文件失败!",e); + }finally{ + if(null != in){ + try { + in.close(); + } catch (IOException e) { + LOG.error("加载系统参数失败!",e); + } + } + } + } + + /** + * 获取配置文件 + * @param name 配置文件名称 + * @return + */ + public synchronized Map loadProperties() { + if(null==pathName||"".equals(pathName)){ + throw new NullPointerException("Properties file path can not null"); + } + if(null == appSettings || appSettings.isEmpty()){ + init(); + } + return appSettings; + } + public synchronized Map loadProperties(String pathName) { + this.pathName=pathName; + if(null == appSettings || appSettings.isEmpty()){ + init(); + } + return appSettings; + } +} diff --git a/src/main/java/com/chenhj/util/SqlParser.java b/src/main/java/com/chenhj/util/SqlParser.java index b769d0b..5f5527d 100644 --- a/src/main/java/com/chenhj/util/SqlParser.java +++ b/src/main/java/com/chenhj/util/SqlParser.java @@ -2,20 +2,16 @@ * */ package com.chenhj.util; - -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; +import java.util.HashMap; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.commons.lang3.StringUtils; - -import com.alibaba.druid.sql.ast.SQLExpr; import com.alibaba.druid.sql.ast.SQLStatement; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement; +import com.alibaba.druid.sql.ast.statement.SQLInsertStatement; +import com.alibaba.druid.sql.ast.statement.SQLUpdateStatement; import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser; +import com.alibaba.fastjson.JSONObject; /** * Copyright: Copyright (c) 2018 Montnets @@ -33,52 +29,83 @@ * 2018年12月21日 chenhj v1.0.0 修改原因 */ public class SqlParser { - private static String tableName; - private static List columnList; - private static List valueList; - public static Map parserInsert(String sql){ + public static String tableName; + public static boolean isInsertSql(String sql){ MySqlStatementParser parser = new MySqlStatementParser(sql); SQLStatement statement = parser.parseStatement(); - MySqlInsertStatement insert = (MySqlInsertStatement)statement; - Map map = new LinkedHashMap<>(); - List columns = insert.getColumns(); // 获得所有列名 - List valuse = insert.getValues().getValues(); - int size = columns.size(); - columnList = new ArrayList<>(); - valueList = new ArrayList<>(); - for(int i=0;i getColumnList() { - return columnList; + private static Pattern regex = Pattern.compile("\\#param\\{([^}]*)\\}"); + /** + * 获取#param{}中的值 + * @param str + */ + public static Map getConfigParent(String str){ + Matcher matcher = regex.matcher(str); + Map map = new HashMap<>(); + int i =1; + while(matcher.find()) { + map.put(matcher.group(1),i); + i++; + } + return map; } - public static List getValueList() { - return valueList; + /** + * 替换#param{}中的值变为? + * @param str + */ + public static String toLegalSql(String configSql){ + Matcher matcher = regex.matcher(configSql); + //把符合正则的数据替换成"?" + configSql=matcher.replaceAll("?"); + return configSql; } /** - * 获取${}中的值 + * 替换#param{}中的值变为JSON中对应的key的值 * @param str */ - public static String getConfigParent(String str){ - Pattern regex = Pattern.compile("\\#param\\{([^}]*)\\}"); - Matcher matcher = regex.matcher(str); + public static String replaceToValue(String configSql,JSONObject json){ + Matcher matcher = regex.matcher(configSql); + //configSql=matcher.replaceAll("?"); while(matcher.find()) { - return matcher.group(1); + String key = matcher.group(1); + Object value = json.get(key); + if(value instanceof String){ + configSql=configSql.replace("#param{"+key+"}","'"+value+"'"); + }else{ + configSql=configSql.replace("#param{"+key+"}",value+""); + } } - return null; + return configSql; } public static void main(String[] args) { - parserInsert("INSERT INTO table_name (phone,imid,aa) VALUES (?,?,'124');"); + //parserInsert("INSERT INTO table_name (phone,imid,aa) VALUES (#param{phone},?,'124');"); //System.out.println(tableName); //System.out.println(sqlFormat("INSERT INTO %s (%s) VALUES (%s);")); + JSONObject json = new JSONObject(); + json.put("phone",15302789406L); + json.put("imid","asdfg"); + String sql = "INSERT INTO table_name (phone,aa,imid,aa) VALUES (#param{phone},'nihao',#param{imid},'124');"; + //String sql ="UPDATE table_name SET field1=new-value1, field2=new-value2 WHERE ID = #param{phone}"; + //获得参数的标志位 +// System.out.println(getConfigParent(sql)); +// sql = toLegalSql(sql); +// //替换标志位的字符 +// System.out.println(sql); +// //验证sql合法性 +// System.out.println(isInsertSql(sql)); + System.out.println(replaceToValue(sql,json)); } } diff --git a/src/main/java/com/chenhj/util/kafka/KafkaUtil.java b/src/main/java/com/chenhj/util/kafka/KafkaUtil.java new file mode 100644 index 0000000..bd275d6 --- /dev/null +++ b/src/main/java/com/chenhj/util/kafka/KafkaUtil.java @@ -0,0 +1,144 @@ +package com.chenhj.util.kafka; + +import java.util.List; +import java.util.Objects; +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.StringSerializer; + + +import com.chenhj.config.Config; + + +/** + * +* @Title: kafkaUtil +* @Description: +* kafka工具类 +* @Version:1.0.0 +* @author pancm +* @date 2018年4月2日 + */ +public final class KafkaUtil { + + private static Properties props = null; + /** + * + * 私有静态方法,创建Kafka生产者 + * + * @author IG + * @Date 2017年4月14日 上午10:32:32 + * @version 1.0.0 + * @return KafkaProducer + */ + private static KafkaProducer createProducer() { + if(null==props){ + init(); + } + KafkaProducer producer = null; + producer= new KafkaProducer(props); + return producer; +} + /** + * 向kafka发送单条消息 + * @param msg 发送的消息 + * @param url 发送的地址 + * @param topicName 消息名称 + * @return + * @throws Exception + */ + public static boolean sendMessage(String msg,String topicName) throws Exception{ + KafkaProducer producer = createProducer(); + boolean falg=false; + try{ + + producer.send(new ProducerRecord(topicName,msg)); + falg=true; + }catch(Exception e){ + throw new KafkaException("向kafka发送消息失败!"); + }finally { + if(producer!=null){ + producer.close(); + } + } + return falg; + } + public static boolean validation(){ + boolean flag = false; + KafkaProducer producer=null; + try{ + if(null==props){ + init(); + } + producer= createProducer(); + if(producer!=null){ + flag = true; + } + }catch(Exception e){ + throw new KafkaException("kafka连接异常!",e); + }finally{ + if(producer!=null){ + producer.close(); + } + } + return flag; + } + /** + * 向kafka发送批量消息 + * @param listMsg 发送的消息 + * @param url 发送的地址 + * @param topicName 消息名称 + * @return + * @throws Exception + */ + public static boolean sendMessage(List listMsg,String topicName) throws Exception{ + KafkaProducer producer=null; + boolean falg=false; + try{ + if(null==props){ + init(); + } + producer= new KafkaProducer(props); + for(String msg:listMsg){ + producer.send(new ProducerRecord(topicName,msg)); + } + falg=true; + }catch(Exception e){ + throw new Exception("向kafka发送消息失败!",e); + }finally{ + if(producer!=null){ + producer.close(); + } + } + return falg; + } + + /** + * 初始化配置 + * @param url kafka地址,多个地址则用‘,’隔开 + * @return + * @return + */ + public synchronized static void init(){ + Objects.requireNonNull(Config.Kafka_CONFIG,"配置文件为空"); + if(null == props){ + props = new Properties(); + props = new Properties(); + props.put("bootstrap.servers", Config.Kafka_CONFIG.getHosts()); + //acks=0:如果设置为0,生产者不会等待kafka的响应。 + //acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。 + //acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。 + props.put("acks", "all"); + //配置为大于0的值的话,客户端会在消息发送失败时重新发送。 + props.put("retries", 0); + //当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率 + props.put("batch.size", 16384); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + } + } + +} diff --git a/src/resources/banner b/src/resources/banner new file mode 100644 index 0000000..6b4cc44 --- /dev/null +++ b/src/resources/banner @@ -0,0 +1,5 @@ + _____ _ ______ __ _______ ______ ___ ____ _____ + | ____| / \ / ___\ \ / / | ____\ \/ / _ \ / _ \| _ \_ _| + | _| / _ \ \___ \\ V / | _| \ /| |_) | | | | |_) || | + | |___ / ___ \ ___) || | | |___ / \| __/| |_| | _ < | | + |_____/_/ \_\____/ |_| |_____/_/\_\_| \___/|_| \_\|_|