-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcsv.go
116 lines (98 loc) · 2.9 KB
/
csv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package harper
import (
"fmt"
"io"
"io/ioutil"
"strings"
)
const (
CSV_ACTION_INSERT = "insert"
CSV_ACTION_UPDATE = "update"
)
type JobResponse struct {
JobID string
}
// CSVDataLoad takes a Reader and executes the CSV Load Data operation
// if "update" is true, it will not insert but update existing records
// If successful, returns the Job ID
func (c *Client) CSVDataLoad(schema, table string, update bool, data io.Reader) (string, error) {
return c.csvDataLoad(schema, table, actionByBool(update), data)
}
func (c *Client) csvDataLoad(schema, table, action string, data io.Reader) (string, error) {
resp := MessageResponse{}
buff, err := ioutil.ReadAll(data)
if err != nil {
return "", err
}
err = c.opRequest(operation{
Operation: OP_CSV_DATA_LOAD,
Action: CSV_ACTION_INSERT,
Schema: schema,
Table: table,
Data: string(buff),
}, &resp)
if err != nil {
return "", err
}
if jobID, ok := extractJobIDFromMessage(resp.Message); ok {
return jobID, nil
}
return "", fmt.Errorf("did not get a job ID from harper instance: %w", ErrJobStatusUnknown)
}
// CSVFileLoad takes a path of a file which must exist on the server
// and executes the CSV Load Data operation
// if "update" is true, it will not insert but update existing records
// If successful, returns the Job ID
func (c *Client) CSVFileLoad(schema, table string, update bool, filePath string) (string, error) {
resp := MessageResponse{}
err := c.opRequest(operation{
Operation: OP_CSV_DATA_LOAD,
Action: CSV_ACTION_INSERT,
Schema: schema,
Table: table,
FilePath: filePath,
}, &resp)
if err != nil {
return "", err
}
if jobID, ok := extractJobIDFromMessage(resp.Message); ok {
return jobID, nil
}
return "", fmt.Errorf("did not get a job ID from harper instance: %w", ErrJobStatusUnknown)
}
// CSVURLLoad takes a public URL
// and executes the CSV Load Data operation
// if "update" is true, it will not insert but update existing records
// If successful, returns the Job ID
func (c *Client) CSVURLLoad(schema, table string, update bool, csvURL string) (string, error) {
resp := MessageResponse{}
err := c.opRequest(operation{
Operation: OP_CSV_DATA_LOAD,
Action: CSV_ACTION_INSERT,
Schema: schema,
Table: table,
CSVURL: csvURL,
}, &resp)
if err != nil {
return "", err
}
if jobID, ok := extractJobIDFromMessage(resp.Message); ok {
return jobID, nil
}
return "", fmt.Errorf("did not get a job ID from harper instance: %w", ErrJobStatusUnknown)
}
// extractJobIDFromMessage returns the Job ID in the "Starting job..." message
// TODO: Return the job ID in the JSON instead
func extractJobIDFromMessage(message string) (string, bool) {
jobID := strings.Replace(message, "Starting job with id ", "", 1)
if len(jobID) != 36 {
return "", false
}
return jobID, true
}
func actionByBool(update bool) string {
if update {
return CSV_ACTION_UPDATE
}
return CSV_ACTION_INSERT
}