Skip to content

Commit b507275

Browse files
committed
v1.2 fix export and 优化es操作
1 parent c229221 commit b507275

6 files changed

Lines changed: 96 additions & 177 deletions

File tree

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
# csv2es: the data import/export tool between es and csv
22
elasticsearch与csv之间的数据导入导出工具
33

4-
说明:该工具只是实现对数据的导入导出,并不包含相关的ETL过程。
4+
说明:
5+
6+
- 现在版本只支持`elasticsearch 5.*`版本
7+
- 该工具只是实现对数据的导入导出,并不包含相关的ETL过程。
58

69
## Install
710

csv2es/cmd/export.go

Lines changed: 50 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
package cmd
1616

1717
import (
18-
"context"
1918
"encoding/json"
2019
"fmt"
20+
"io"
2121
"os"
2222
"strings"
2323

2424
"github.com/ibbd-dev/go-csv"
25+
"github.com/ibbd-dev/go-tools/es"
2526
"github.com/spf13/cobra"
2627
"gopkg.in/olivere/elastic.v5"
2728
)
@@ -30,7 +31,7 @@ import (
3031
var exportCmd = &cobra.Command{
3132
Use: "export",
3233
Short: "export data to csv from es",
33-
Long: `export data to csv from es",
34+
Long: `从es导出数据到csv文件,
3435
`,
3536
Example: `
3637
csv2es export --host=locahost --port=9200 --index=test --csv=source.csv
@@ -45,119 +46,80 @@ csv2es export --host=locahost --port=9200 --index=test --csv=source.csv
4546
writer := goCsv.NewMapWriterSimple(out)
4647

4748
// es
48-
conn, err := getESConnect()
49-
ctx := context.Background()
50-
exists, err := conn.IndexExists(cParams.IndexName).Do(ctx)
49+
conn, err := es.NewClient(cParams.Host, cParams.Port, cParams.IndexName, cParams.DocType)
5150
if err != nil {
52-
panic(fmt.Errorf("check index exists error: %v", err.Error()))
51+
panic(fmt.Errorf("es newClient: %v", err.Error()))
5352
}
5453

55-
if !exists {
56-
panic(fmt.Errorf("index %s is not exists", cParams.IndexName))
54+
if cParams.BulkSize <= 0 {
55+
cParams.BulkSize = 1000
5756
}
58-
if cParams.Size <= 0 {
59-
cParams.Size = 1000
57+
conn.SetLimit(cParams.Limit)
58+
conn.SetBulkSize(cParams.BulkSize)
59+
conn.SetDebug(cParams.Debug)
60+
if cParams.Debug {
61+
fmt.Printf("client config: %+v\n", conn)
6062
}
6163

62-
search := conn.Search(cParams.IndexName)
64+
var query elastic.Query
6365
if len(cParams.QueryField) > 0 {
64-
var query = elastic.NewTermQuery(cParams.QueryField, cParams.QueryValue)
65-
search = search.Query(query)
66+
query = elastic.NewTermQuery(cParams.QueryField, cParams.QueryValue)
6667
}
67-
68-
searchResult, err := search.Size(cParams.Size).Do(ctx)
69-
if err != nil {
70-
panic(fmt.Errorf("search error: %v", err.Error()))
71-
}
72-
73-
resTotal := searchResult.Hits.TotalHits
74-
fmt.Printf("search research total: %d\n", resTotal)
75-
76-
if resTotal < 1 {
77-
fmt.Println("search result is empty!")
78-
return
68+
if err = conn.SearchInit(query); err != nil {
69+
panic(err)
7970
}
8071

81-
var page int // 记录当前页码
82-
var count int64 // 记录总的记录数
83-
for count < resTotal {
84-
for i, hit := range searchResult.Hits.Hits {
85-
if i == 0 && cParams.Debug {
86-
fmt.Printf("[debug]row[0] = %s\n", string(*hit.Source))
87-
}
72+
var count int // 记录总的记录数
73+
for {
74+
row, err := conn.Read()
75+
if err == io.EOF {
76+
fmt.Println("read over")
77+
break
78+
}
79+
if err != nil {
80+
panic(err)
81+
}
8882

89-
var row = make(map[string]interface{})
90-
err = json.Unmarshal(*hit.Source, &row)
91-
if err != nil {
92-
panic(fmt.Errorf("search %d: json unmarshal error: %v", i, err.Error()))
83+
if count == 0 {
84+
// 首行
85+
var headers []string
86+
for k, _ := range row {
87+
headers = append(headers, k)
9388
}
94-
if count == 0 {
95-
// 首行
96-
var headers []string
97-
for k, _ := range row {
98-
headers = append(headers, k)
99-
}
10089

101-
writer.SetHeader(headers)
102-
if err = writer.WriteHeader(); err != nil {
103-
panic(fmt.Errorf("csv writer header error: %s", err.Error()))
104-
}
105-
fmt.Printf("Fieldnames: %s\n", strings.Join(headers, ", "))
90+
writer.SetHeader(headers)
91+
if err = writer.WriteHeader(); err != nil {
92+
panic(fmt.Errorf("csv writer header error: %s", err.Error()))
10693
}
94+
fmt.Printf("Fieldnames: %s\n", strings.Join(headers, ", "))
95+
}
10796

108-
var strRow = make(map[string]string)
109-
for k, v := range row {
110-
if vv, ok := v.(string); ok {
111-
strRow[k] = vv
112-
} else if sv, err := json.Marshal(v); err != nil {
113-
panic(fmt.Errorf("csv writer header error: %s", err.Error()))
114-
} else {
115-
strRow[k] = string(sv)
116-
//strRow[k] = strings.Trim(strRow[k], "\"")
117-
}
97+
var strRow = make(map[string]string)
98+
for k, v := range row {
99+
if vv, ok := v.(string); ok {
100+
strRow[k] = vv
101+
} else if sv, err := json.Marshal(v); err != nil {
102+
panic(fmt.Errorf("csv writer header error: %s", err.Error()))
103+
} else {
104+
strRow[k] = string(sv)
118105
}
119-
120-
count += 1
121-
writer.WriteRow(strRow)
122106
}
123-
writer.Flush()
124107

125-
// 下一页
126-
searchResult, err = func(scrollId string) (*elastic.SearchResult, error) {
127-
page++
128-
fmt.Printf("search page: %d\n", page)
129-
return conn.Scroll(cParams.IndexName).ScrollId(scrollId).Do(ctx)
130-
}(searchResult.ScrollId)
131-
if err != nil {
132-
panic(fmt.Errorf("next scroll error: %s", err.Error()))
108+
count += 1
109+
writer.WriteRow(strRow)
110+
if count%cParams.BulkSize == 0 {
111+
writer.Flush()
133112
}
134-
} // end of count < resTotal
113+
} // end of for
135114

115+
writer.Flush()
136116
fmt.Printf("Total %d\n", count)
137117
},
138118
}
139119

140120
func init() {
141121
rootCmd.AddCommand(exportCmd)
142122

143-
// Here you will define your flags and configuration settings.
144-
145-
// Cobra supports Persistent Flags which will work for this command
146-
// and all subcommands, e.g.:
147-
// exportCmd.PersistentFlags().String("foo", "", "A help for foo")
148123
exportCmd.PersistentFlags().StringVar(&cParams.QueryField, "query-field", "", "过滤字段")
149124
exportCmd.PersistentFlags().StringVar(&cParams.QueryValue, "query-value", "", "过滤字段对应的值")
150-
151-
// Cobra supports local flags which will only run when this command
152-
// is called directly, e.g.:
153-
// exportCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
154-
}
155-
156-
func StringIn(str string, ss []string) bool {
157-
for _, s := range ss {
158-
if s == str {
159-
return true
160-
}
161-
}
162-
return false
163125
}

csv2es/cmd/import.go

Lines changed: 28 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package cmd
1616

1717
import (
18-
"context"
1918
"encoding/json"
2019
"fmt"
2120
"io"
@@ -24,15 +23,15 @@ import (
2423
"strings"
2524

2625
"github.com/ibbd-dev/go-csv"
26+
"github.com/ibbd-dev/go-tools/es"
2727
"github.com/spf13/cobra"
28-
"gopkg.in/olivere/elastic.v5"
2928
)
3029

3130
// importCmd represents the import command
3231
var importCmd = &cobra.Command{
3332
Use: "import",
3433
Short: "import data from csv to es",
35-
Long: `import data from csv to es
34+
Long: `从csv导入数据到es
3635
`,
3736
Example: `
3837
csv2es import --host=locahost --port=9200 --mapping=mapping_filename.json --index=test --type=test --csv=source.csv
@@ -53,50 +52,37 @@ csv2es import --host=locahost --port=9200 --mapping=mapping_filename.json --inde
5352
fmt.Printf("fieldnames: %s\n", strings.Join(fieldnames, ", "))
5453

5554
// es
56-
conn, err := getESConnect()
57-
ctx := context.Background()
58-
exists, err := conn.IndexExists(cParams.IndexName).Do(ctx)
55+
conn, err := es.NewClient(cParams.Host, cParams.Port, cParams.IndexName, cParams.DocType)
5956
if err != nil {
60-
panic(fmt.Errorf("check index exists error: %v", err.Error()))
57+
panic(fmt.Errorf("es newClient: %v", err.Error()))
6158
}
6259

63-
if exists {
64-
if cParams.DeleteIndex {
65-
// 删除同名索引
66-
fmt.Printf("begin to delete index: %s\n", cParams.IndexName)
67-
if _, err = conn.DeleteIndex(cParams.IndexName).Do(ctx); err != nil {
68-
panic(fmt.Errorf("delete index error: %s", cParams.IndexName))
69-
}
70-
}
60+
if cParams.BulkSize <= 0 {
61+
cParams.BulkSize = 1000
62+
}
63+
conn.SetLimit(cParams.Limit)
64+
conn.SetBulkSize(cParams.BulkSize)
65+
conn.SetDebug(cParams.Debug)
7166

72-
// 创建索引
73-
fmt.Printf("begin to create index: %s\n", cParams.IndexName)
74-
if _, err = conn.CreateIndex(cParams.IndexName).Do(ctx); err != nil {
67+
var mapping = make(map[string]interface{})
68+
if len(cParams.Mapping) > 0 {
69+
bytes, err := ioutil.ReadFile(cParams.Mapping)
70+
if err != nil {
71+
fmt.Println("Read mapping file: ", err.Error())
7572
panic(err)
7673
}
7774

78-
if len(cParams.Mapping) > 0 {
79-
bytes, err := ioutil.ReadFile(cParams.Mapping)
80-
if err != nil {
81-
fmt.Println("Read mapping file: ", err.Error())
82-
panic(err)
83-
}
84-
85-
var mapping = make(map[string]interface{})
86-
if err := json.Unmarshal(bytes, &mapping); err != nil {
87-
fmt.Println("Json Unmarshal: ", err.Error())
88-
panic(err)
89-
}
90-
91-
fmt.Printf("begin to put index mapping: %s\n", cParams.IndexName)
92-
if _, err := conn.PutMapping().Index(cParams.IndexName).Type(cParams.DocType).BodyJson(mapping).Do(ctx); err != nil {
93-
panic(err)
94-
}
75+
if err := json.Unmarshal(bytes, &mapping); err != nil {
76+
fmt.Println("Json Unmarshal: ", err.Error())
77+
panic(err)
9578
}
9679
}
9780

81+
if err = conn.ImportInit(cParams.DeleteIndex, mapping); err != nil {
82+
panic(err)
83+
}
84+
9885
var count int
99-
bulk := conn.Bulk()
10086
for {
10187
row, err := reader.Read()
10288
if err == io.EOF {
@@ -107,47 +93,26 @@ csv2es import --host=locahost --port=9200 --mapping=mapping_filename.json --inde
10793
panic(err)
10894
}
10995

110-
req := elastic.NewBulkIndexRequest().Index(cParams.IndexName).Type(cParams.DocType).Doc(row)
111-
bulk.Add(req)
96+
conn.BulkAdd(row)
11297

11398
count++
114-
if cParams.Size > 0 && count >= cParams.Size {
115-
fmt.Printf("导出限制数量:%d\n", cParams.Size)
99+
if cParams.Limit > 0 && count >= cParams.Limit {
100+
fmt.Printf("导出限制数量:%d\n", cParams.Limit)
116101
break
117102
}
118103
}
119104

120105
// 执行导入
121-
bulkResponse, err := bulk.Do(ctx)
122-
if err != nil {
123-
panic(fmt.Errorf("index %v 批量导入数据出错: %v", cParams.IndexName, err.Error()))
124-
}
125-
126-
// 统计写入状态
127-
var errCount int
128-
indexed := bulkResponse.Indexed()
129-
for i, res := range indexed {
130-
if res.Error != nil {
131-
fmt.Printf("ERROR: %d, %+v\n", i, res.Error)
132-
errCount++
133-
}
106+
if err = conn.BulkImport(); err != nil {
107+
panic(err)
134108
}
135-
fmt.Printf("向es写入的数据量:%d,异常:%d\n", count, errCount)
109+
fmt.Printf("向es写入的数据量:%d\n", count)
136110
},
137111
}
138112

139113
func init() {
140114
rootCmd.AddCommand(importCmd)
141115

142-
// Here you will define your flags and configuration settings.
143-
144-
// Cobra supports Persistent Flags which will work for this command
145-
// and all subcommands, e.g.:
146-
// importCmd.PersistentFlags().String("foo", "", "A help for foo")
147116
importCmd.PersistentFlags().BoolVar(&cParams.DeleteIndex, "delete-index", false, "delete the same index before")
148117
importCmd.PersistentFlags().StringVar(&cParams.Mapping, "mapping", "", "the mapping file name, json format")
149-
150-
// Cobra supports local flags which will only run when this command
151-
// is called directly, e.g.:
152-
// importCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
153118
}

0 commit comments

Comments
 (0)