Skip to content

Commit cc2f01d

Browse files
committed
init commit
0 parents  commit cc2f01d

File tree

580 files changed

+11270
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

580 files changed

+11270
-0
lines changed

.DS_Store

6 KB
Binary file not shown.

.gitignore

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
.vscode/
2+
3+
# Binaries for programs and plugins
4+
*.exe
5+
*.exe~
6+
*.dll
7+
*.so
8+
*.dylib
9+
10+
# Test binary, built with `go test -c`
11+
*.test
12+
13+
# Output of the go coverage tool, specifically when used with LiteIDE
14+
*.out
15+
16+
# Dependency directories (remove the comment below to include it)
17+
# vendor/
18+
19+
# Go workspace file
20+
go.work

README.md

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Delta Go
2+
3+
## About
4+
5+
This repository contains a Go package that provides a connector for [Delta Lake](https://delta.io/) - an open-source storage layer that brings ACID transactions to Apache Spark big data workloads. A Go portal of the official Scala [delta standalone](https://github.com/delta-io/connectors).
6+
7+
What is it?
8+
9+
- It provides low level access to read and write Delta Lake **metadata** by implementing the Delta Lake transaction log protocol.
10+
11+
What is it not?
12+
13+
- It does not read, write or update the data for Delta Lake table directly, but the compute engine on top of it should do that.
14+
15+
## Supported backends
16+
17+
- Local file system (Done)
18+
- Azure Blob Storage (WIP)
19+
20+
## Status
21+
22+
- Currently only the local file system is fully supported and thoroughly tested against golden table data in the official Delta Standalone repo.
23+
- For other cloud storage, on the roadmap, any contribution is welcome!
24+
25+
## Usage
26+
27+
```go
28+
package examples
29+
30+
import (
31+
"log"
32+
"path/filepath"
33+
"testing"
34+
35+
delta "github.com/csimplestring/delta-go"
36+
)
37+
38+
func main() {
39+
path = "file://YOUR_DELTA_LOG_FOLDER/"
40+
41+
config := delta.Config{
42+
StorageConfig: delta.StorageConfig{
43+
Scheme: delta.Local,
44+
},
45+
}
46+
47+
table, err := delta.ForTable(path, config, &delta.SystemClock{})
48+
if err != nil {
49+
log.Fatal(err)
50+
}
51+
52+
// get the snapshot
53+
s, err := table.Snapshot()
54+
if err != nil {
55+
log.Fatal(err)
56+
}
57+
58+
// get the log version
59+
version := s.Version()
60+
log.Println(version)
61+
62+
// iterate all the log files
63+
files, err := s.AllFiles()
64+
if err != nil {
65+
log.Fatal(err)
66+
}
67+
for _, f := range files {
68+
log.Println(f.Path)
69+
}
70+
}
71+
72+
```
73+
74+
## Contributing
75+
76+
Contributions to this project are welcome. To contribute, please follow these steps:
77+
78+
1. Fork the repository
79+
2. Create a new branch for your feature/bugfix
80+
3. Make changes and commit them with clear commit messages
81+
4. Push your changes to your forked repository
82+
5. Create a pull request to the original repository
83+
84+
## License
85+
86+
This project is licensed under the [Apache-2.0 License](https://www.apache.org/licenses/LICENSE-2.0).

action/action.go

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package action
2+
3+
import (
4+
"encoding/json"
5+
"net/url"
6+
7+
"github.com/csimplestring/delta-go/errno"
8+
)
9+
10+
const ReaderVersion = 1
11+
const WriterVersion = 2
12+
const MinReaderVersionProp = "delta.minReaderVersion"
13+
const MinWriterVersionProp = "delta.minWriterVersion"
14+
15+
type Action interface {
16+
Wrap() *SingleAction
17+
Json() (string, error)
18+
}
19+
20+
type FileAction interface {
21+
Action
22+
PathAsUri() (*url.URL, error)
23+
IsDataChanged() bool
24+
}
25+
26+
func FromJson(s string) (Action, error) {
27+
action := &SingleAction{}
28+
if err := json.Unmarshal([]byte(s), action); err != nil {
29+
return nil, errno.JsonUnmarshalError(err)
30+
}
31+
32+
return action.Unwrap(), nil
33+
}
34+
35+
func jsonString(a Action) (string, error) {
36+
b, err := json.Marshal(a.Wrap())
37+
if err != nil {
38+
return "", errno.JsonMarshalError(err)
39+
}
40+
return string(b), nil
41+
}
42+
43+
func CheckMetadataProtocolProperties(metadata *Metadata, protocol *Protocol) error {
44+
if _, ok := metadata.Configuration[MinReaderVersionProp]; ok {
45+
return errno.AssertionError("should not have the protocol version MinReaderVersion as part of the table properties")
46+
}
47+
if _, ok := metadata.Configuration[MinWriterVersionProp]; ok {
48+
return errno.AssertionError("should not have the protocol version MinWriterVersion as part of the table properties")
49+
}
50+
return nil
51+
}
52+
53+
type SingleAction struct {
54+
Txn *SetTransaction `json:"txn,omitempty"`
55+
Add *AddFile `json:"add,omitempty"`
56+
Remove *RemoveFile `json:"remove,omitempty"`
57+
MetaData *Metadata `json:"metaData,omitempty"`
58+
Protocol *Protocol `json:"protocol,omitempty"`
59+
Cdc *AddCDCFile `json:"cdc,omitempty"`
60+
CommitInfo *CommitInfo `json:"commitInfo,omitempty"`
61+
}
62+
63+
func (s *SingleAction) Unwrap() Action {
64+
if s.Add != nil {
65+
return s.Add
66+
} else if s.Remove != nil {
67+
return s.Remove
68+
} else if s.MetaData != nil {
69+
return s.MetaData
70+
} else if s.Txn != nil {
71+
return s.Txn
72+
} else if s.Protocol != nil {
73+
return s.Protocol
74+
} else if s.Cdc != nil {
75+
return s.Cdc
76+
} else if s.CommitInfo != nil {
77+
return s.CommitInfo
78+
} else {
79+
return nil
80+
}
81+
}

action/actions_test.go

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package action
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestMetadata_Equal(t *testing.T) {
10+
t1 := int64(1)
11+
t2 := int64(1)
12+
13+
m1 := &Metadata{
14+
ID: "1",
15+
Name: "2",
16+
Description: "3",
17+
Format: Format{
18+
Proviver: "4",
19+
Options: map[string]string{
20+
"5": "5",
21+
},
22+
},
23+
SchemaString: "6",
24+
PartitionColumns: []string{"7"},
25+
Configuration: map[string]string{"8": "8"},
26+
CreatedTime: &t1,
27+
}
28+
29+
m2 := &Metadata{
30+
ID: "1",
31+
Name: "2",
32+
Description: "3",
33+
Format: Format{
34+
Proviver: "4",
35+
Options: map[string]string{
36+
"5": "5",
37+
},
38+
},
39+
SchemaString: "6",
40+
PartitionColumns: []string{"7"},
41+
Configuration: map[string]string{"8": "8"},
42+
CreatedTime: &t2,
43+
}
44+
45+
assert.True(t, m1.Equals(m2))
46+
}

action/add.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package action
2+
3+
import (
4+
"net/url"
5+
"time"
6+
7+
"github.com/ulule/deepcopier"
8+
)
9+
10+
type AddFile struct {
11+
Path string `json:"path,omitempty"`
12+
DataChange bool `json:"dataChange,omitempty"`
13+
PartitionValues map[string]string `json:"partitionValues,omitempty"`
14+
Size int64 `json:"size,omitempty"`
15+
ModificationTime int64 `json:"modificationTime,omitempty"`
16+
Stats string `json:"stats,omitempty"`
17+
Tags map[string]string `json:"tags,omitempty"`
18+
}
19+
20+
func (a *AddFile) IsDataChanged() bool {
21+
return a.DataChange
22+
}
23+
24+
func (a *AddFile) PathAsUri() (*url.URL, error) {
25+
return url.Parse(a.Path)
26+
}
27+
28+
func (a *AddFile) Wrap() *SingleAction {
29+
return &SingleAction{Add: a}
30+
}
31+
32+
func (a *AddFile) Json() (string, error) {
33+
return jsonString(a)
34+
}
35+
36+
func (a *AddFile) Remove() *RemoveFile {
37+
return a.RemoveWithTimestamp(nil, nil)
38+
}
39+
40+
func (a *AddFile) RemoveWithTimestamp(ts *int64, dataChange *bool) *RemoveFile {
41+
if ts == nil {
42+
*ts = time.Now().UnixMilli()
43+
}
44+
if dataChange == nil {
45+
*dataChange = true
46+
}
47+
48+
return &RemoveFile{
49+
Path: a.Path,
50+
DeletionTimestamp: ts,
51+
DataChange: *dataChange,
52+
}
53+
}
54+
55+
func (a *AddFile) Copy(dataChange bool, path string) *AddFile {
56+
dst := &AddFile{}
57+
deepcopier.Copy(a).To(dst)
58+
dst.Path = path
59+
dst.DataChange = dataChange
60+
return dst
61+
}

action/cdc.go

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package action
2+
3+
import (
4+
"net/url"
5+
)
6+
7+
type AddCDCFile struct {
8+
Path string `json:"path,omitempty"`
9+
DataChange bool `json:"dataChange,omitempty"`
10+
PartitionValues map[string]string `json:"partitionValues,omitempty"`
11+
Size int64 `json:"size,omitempty"`
12+
Tags map[string]string `json:"tags,omitempty"`
13+
}
14+
15+
func (a *AddCDCFile) IsDataChanged() bool {
16+
return a.DataChange
17+
}
18+
19+
func (a *AddCDCFile) PathAsUri() (*url.URL, error) {
20+
return url.Parse(a.Path)
21+
}
22+
23+
func (a *AddCDCFile) Wrap() *SingleAction {
24+
return &SingleAction{Cdc: a}
25+
}
26+
27+
func (a *AddCDCFile) Json() (string, error) {
28+
return jsonString(a)
29+
}

action/commit_info.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package action
2+
3+
import (
4+
"github.com/ulule/deepcopier"
5+
)
6+
7+
type CommitMarker interface {
8+
GetTimestamp() int64
9+
WithTimestamp(timestamp int64) CommitMarker
10+
GetVersion() int64
11+
}
12+
13+
type CommitInfo struct {
14+
Version *int64 `json:"version,omitempty"`
15+
Timestamp int64 `json:"timestamp,omitempty"`
16+
UserID *string `json:"userId,omitempty"`
17+
UserName *string `json:"userName,omitempty"`
18+
Operation string `json:"operation,omitempty"`
19+
OperationParameters map[string]string `json:"operationParameters,omitempty"`
20+
Job *JobInfo `json:"job,omitempty"`
21+
Notebook *NotebookInfo `json:"notebook,omitempty"`
22+
ClusterId *string `json:"clusterId,omitempty"`
23+
ReadVersion *int64 `json:"readVersion,omitempty"`
24+
IsolationLevel *string `json:"isolationLevel,omitempty"`
25+
IsBlindAppend *bool `json:"isBlindAppend,omitempty"`
26+
OperationMetrics map[string]string `json:"operationMetrics,omitempty"`
27+
UserMetadata *string `json:"userMetadata,omitempty"`
28+
EngineInfo *string `json:"engineInfo,omitempty"`
29+
}
30+
31+
func (c *CommitInfo) Wrap() *SingleAction {
32+
return &SingleAction{CommitInfo: c}
33+
}
34+
35+
func (c *CommitInfo) Json() (string, error) {
36+
return jsonString(c)
37+
}
38+
39+
func (c *CommitInfo) GetTimestamp() int64 {
40+
return c.Timestamp
41+
}
42+
43+
func (c *CommitInfo) WithTimestamp(timestamp int64) CommitMarker {
44+
copied := &CommitInfo{}
45+
deepcopier.Copy(c).To(copied)
46+
47+
copied.Timestamp = timestamp
48+
return copied
49+
}
50+
51+
func (c *CommitInfo) GetVersion() int64 {
52+
return *c.Version
53+
}
54+
55+
func (c *CommitInfo) Copy(version int64) *CommitInfo {
56+
res := &CommitInfo{}
57+
deepcopier.Copy(c).To(res)
58+
res.Version = &version
59+
return res
60+
}

action/format.go

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package action
2+
3+
type Format struct {
4+
Proviver string `json:"provider,omitempty"`
5+
Options map[string]string `json:"options,omitempty"`
6+
}

0 commit comments

Comments
 (0)