Skip to content

Commit a6251ff

Browse files
committed
v1.0 export debug
1 parent f30323d commit a6251ff

File tree

5 files changed

+169
-4
lines changed

5 files changed

+169
-4
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,6 @@
1010

1111
# Output of the go coverage tool, specifically when used with LiteIDE
1212
*.out
13+
14+
# vim
15+
*.swp

csv2es/cmd/export.go

+126
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright © 2018 NAME HERE <EMAIL ADDRESS>
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package cmd
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"fmt"
21+
"os"
22+
"strings"
23+
24+
"github.com/ibbd-dev/go-csv"
25+
"github.com/spf13/cobra"
26+
"gopkg.in/olivere/elastic.v5"
27+
)
28+
29+
// exportCmd represents the export command
30+
var exportCmd = &cobra.Command{
31+
Use: "export",
32+
Short: "export data to csv from es",
33+
Long: `export data to csv from es",
34+
`,
35+
Example: `
36+
csv2es export --host=locahost --port=9200 --index=test --csv=source.csv
37+
`,
38+
Run: func(cmd *cobra.Command, args []string) {
39+
// 创建输出文件
40+
out, err := os.Create(cParams.CsvFilename)
41+
if err != nil {
42+
panic(err)
43+
}
44+
defer out.Close()
45+
writer := goCsv.NewMapWriterSimple(out)
46+
47+
// es
48+
conn, err := getESConnect()
49+
ctx := context.Background()
50+
exists, err := conn.IndexExists(cParams.IndexName).Do(ctx)
51+
if err != nil {
52+
panic(fmt.Errorf("check index exists error: %v", err.Error()))
53+
}
54+
55+
if !exists {
56+
panic(fmt.Errorf("index %s is not exists", cParams.IndexName))
57+
}
58+
if cParams.Size <= 0 {
59+
cParams.Size = 1000
60+
}
61+
62+
search := conn.Search(cParams.IndexName)
63+
if len(cParams.QueryField) > 0 {
64+
var query = elastic.NewTermQuery(cParams.QueryField, cParams.QueryValue)
65+
search.Query(query)
66+
}
67+
68+
searchResult, err := search.Size(cParams.Size).Do(ctx)
69+
if err != nil {
70+
panic(fmt.Errorf("search error: %v", err.Error()))
71+
}
72+
73+
var count int
74+
for i, hit := range searchResult.Hits.Hits {
75+
var row = make(map[string]interface{})
76+
err = json.Unmarshal(*hit.Source, &row)
77+
if err != nil {
78+
panic(fmt.Errorf("search %d: json unmarshal error: %v", i, err.Error()))
79+
}
80+
if count == 0 {
81+
// 首行
82+
var headers []string
83+
for k, _ := range row {
84+
headers = append(headers, k)
85+
}
86+
87+
writer.SetHeader(headers)
88+
if err = writer.WriteHeader(); err != nil {
89+
panic(fmt.Errorf("csv writer header error: %s", err.Error()))
90+
}
91+
fmt.Printf("Fieldnames: %s\n", strings.Join(headers, ", "))
92+
}
93+
94+
count += 1
95+
var strRow = make(map[string]string)
96+
for k, v := range row {
97+
if sv, err := json.Marshal(v); err != nil {
98+
panic(fmt.Errorf("csv writer header error: %s", err.Error()))
99+
} else {
100+
strRow[k] = string(sv)
101+
strRow[k] = strings.Trim(strRow[k], "\"")
102+
}
103+
}
104+
writer.WriteRow(strRow)
105+
}
106+
writer.Flush()
107+
108+
fmt.Printf("Total %d\n", count)
109+
},
110+
}
111+
112+
func init() {
113+
rootCmd.AddCommand(exportCmd)
114+
115+
// Here you will define your flags and configuration settings.
116+
117+
// Cobra supports Persistent Flags which will work for this command
118+
// and all subcommands, e.g.:
119+
// exportCmd.PersistentFlags().String("foo", "", "A help for foo")
120+
exportCmd.PersistentFlags().StringVar(&cParams.QueryField, "query-field", "", "过滤字段")
121+
exportCmd.PersistentFlags().StringVar(&cParams.QueryValue, "query-value", "", "过滤字段对应的值")
122+
123+
// Cobra supports local flags which will only run when this command
124+
// is called directly, e.g.:
125+
// exportCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
126+
}

csv2es/cmd/import.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ var importCmd = &cobra.Command{
3838
csv2es import --host=locahost --port=9200 --mapping=mapping_filename.json --index=test --type=test --csv=source.csv
3939
`,
4040
Run: func(cmd *cobra.Command, args []string) {
41-
// 创建输出文件
41+
// 创建输入文件
4242
in, err := os.Open(cParams.CsvFilename)
4343
if err != nil {
4444
panic(err)
@@ -88,6 +88,7 @@ csv2es import --host=locahost --port=9200 --mapping=mapping_filename.json --inde
8888
}
8989
}
9090

91+
var count int
9192
bulk := conn.Bulk()
9293
for {
9394
row, err := reader.Read()
@@ -101,6 +102,12 @@ csv2es import --host=locahost --port=9200 --mapping=mapping_filename.json --inde
101102

102103
req := elastic.NewBulkIndexRequest().Index(cParams.IndexName).Type(cParams.DocType).Doc(row)
103104
bulk.Add(req)
105+
106+
count++
107+
if cParams.Size > 0 && count >= cParams.Size {
108+
fmt.Printf("导出限制数量:%d\n", cParams.Size)
109+
break
110+
}
104111
}
105112

106113
// 执行导入
@@ -110,15 +117,15 @@ csv2es import --host=locahost --port=9200 --mapping=mapping_filename.json --inde
110117
}
111118

112119
// 统计写入状态
113-
var count int
120+
var errCount int
114121
indexed := bulkResponse.Indexed()
115122
for i, res := range indexed {
116123
if res.Error != nil {
117124
fmt.Printf("ERROR: %d, %+v\n", i, res.Error)
118-
count++
125+
errCount++
119126
}
120127
}
121-
fmt.Printf("向es写入的数据量:%d,异常:%d\n", len(indexed), count)
128+
fmt.Printf("向es写入的数据量:%d,异常:%d\n", count, errCount)
122129
},
123130
}
124131

csv2es/cmd/root.go

+12
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
type CommonParams struct {
2727
Debug bool
28+
Size int // 导入导出的数量,0表示不限
2829

2930
// es config
3031
Host string
@@ -36,6 +37,10 @@ type CommonParams struct {
3637
DeleteIndex bool // 是否删除原索引
3738
Mapping string // mapping 文件
3839

40+
// export
41+
QueryField string
42+
QueryValue string
43+
3944
CsvFilename string
4045
}
4146

@@ -48,6 +53,11 @@ var rootCmd = &cobra.Command{
4853
Short: "import/export data beteen csv and es",
4954
Long: `import/export data between csv and es
5055
56+
实现功能:
57+
58+
- [x] import: 从csv文件导入数据到es
59+
- [ ] export: 从es导出数据到csv文件
60+
5161
Author: Alex Cai
5262
BuildAt: 20180621
5363
`,
@@ -78,6 +88,8 @@ func init() {
7888
rootCmd.PersistentFlags().StringVar(&cParams.DocType, "type", "", "es doc type")
7989
rootCmd.PersistentFlags().StringVar(&cParams.CsvFilename, "csv", "", "csv filename")
8090

91+
rootCmd.PersistentFlags().IntVar(&cParams.Size, "size", 0, "导入导出的数量,默认为0,表示导入不限量,导出限制为1000")
92+
8193
// Cobra also supports local flags, which will only run
8294
// when this action is called directly.
8395
//rootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")

examples/export.sh

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#!/bin/bash
2+
#
3+
# 数据导出
4+
# Author: alex
5+
# Created Time: 2018年06月21日 星期四 21时21分19秒
6+
7+
cmd="csv2es"
8+
if [ -f csv2es ]; then
9+
cmd="./csv2es"
10+
fi
11+
12+
index=test
13+
if [ $# = 1 ]; then
14+
index="$1"
15+
fi
16+
17+
$cmd export --host=100.115.147.50 --port=9200 --index="$index" --csv=./output.csv --size=10000

0 commit comments

Comments
 (0)