Skip to content

Commit 95925b1

Browse files
authored
GT-183 Pregel API support (#428)
1 parent f41a0ac commit 95925b1

8 files changed

+380
-2
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
- New analyzers: `classification`, `nearest neighbors`, `minhash`
1414
- Add support for Inverted index
1515
- Deprecate fulltext index
16+
- Add support for Pregel API
1617

1718
## [1.3.3](https://github.com/arangodb/go-driver/tree/v1.3.3) (2022-07-27)
1819
- Fix `lastValue` field type

database.go

+3
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ type Database interface {
5252
// Graph functions
5353
DatabaseGraphs
5454

55+
// Pregel functions
56+
DatabasePregels
57+
5558
// Streaming Transactions functions
5659
DatabaseStreamingTransactions
5760

database_graphs.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type DatabaseGraphs interface {
4242

4343
// CreateGraph creates a new graph with given name and options, and opens a connection to it.
4444
// If a graph with given name already exists within the database, a DuplicateError is returned.
45-
// @deprecated since ArangoDB 3.9 - please use CreateGraphV2 instead
45+
// Deprecated: since ArangoDB 3.9 - please use CreateGraphV2 instead
4646
CreateGraph(ctx context.Context, name string, options *CreateGraphOptions) (Graph, error)
4747

4848
// CreateGraphV2 creates a new graph with given name and options, and opens a connection to it.

database_graphs_impl.go

+1
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ type createGraphAdditionalOptions struct {
173173

174174
// CreateGraph creates a new graph with given name and options, and opens a connection to it.
175175
// If a graph with given name already exists within the database, a DuplicateError is returned.
176+
// Deprecated: since ArangoDB 3.9 - please use CreateGraphV2 instead
176177
func (d *database) CreateGraph(ctx context.Context, name string, options *CreateGraphOptions) (Graph, error) {
177178
input := createGraphOptions{
178179
Name: name,

database_pregel.go

+182
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2022 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package driver
22+
23+
import (
24+
"context"
25+
"time"
26+
)
27+
28+
// DatabasePregels provides access to all Pregel Jobs in a single database.
29+
type DatabasePregels interface {
30+
// StartJob - Start the execution of a Pregel algorithm
31+
StartJob(ctx context.Context, options PregelJobOptions) (string, error)
32+
// GetJob - Get the status of a Pregel execution
33+
GetJob(ctx context.Context, id string) (*PregelJob, error)
34+
// GetJobs - Returns a list of currently running and recently finished Pregel jobs without retrieving their results.
35+
GetJobs(ctx context.Context) ([]*PregelJob, error)
36+
// CancelJob - Cancel an ongoing Pregel execution
37+
CancelJob(ctx context.Context, id string) error
38+
}
39+
40+
type PregelAlgorithm string
41+
42+
const (
43+
PregelAlgorithmPageRank PregelAlgorithm = "pagerank"
44+
PregelAlgorithmSingleSourceShortestPath PregelAlgorithm = "sssp"
45+
PregelAlgorithmConnectedComponents PregelAlgorithm = "connectedcomponents"
46+
PregelAlgorithmWeaklyConnectedComponents PregelAlgorithm = "wcc"
47+
PregelAlgorithmStronglyConnectedComponents PregelAlgorithm = "scc"
48+
PregelAlgorithmHyperlinkInducedTopicSearch PregelAlgorithm = "hits"
49+
PregelAlgorithmEffectiveCloseness PregelAlgorithm = "effectivecloseness"
50+
PregelAlgorithmLineRank PregelAlgorithm = "linerank"
51+
PregelAlgorithmLabelPropagation PregelAlgorithm = "labelpropagation"
52+
PregelAlgorithmSpeakerListenerLabelPropagation PregelAlgorithm = "slpa"
53+
)
54+
55+
type PregelJobOptions struct {
56+
// Name of the algorithm
57+
Algorithm PregelAlgorithm `json:"algorithm"`
58+
// Name of a graph. Either this or the parameters VertexCollections and EdgeCollections are required.
59+
// Please note that there are special sharding requirements for graphs in order to be used with Pregel.
60+
GraphName string `json:"graphName,optional"`
61+
// List of vertex collection names. Please note that there are special sharding requirements for collections in order to be used with Pregel.
62+
VertexCollections []string `json:"vertexCollections,optional"`
63+
// List of edge collection names. Please note that there are special sharding requirements for collections in order to be used with Pregel.
64+
EdgeCollections []string `json:"edgeCollections,optional"`
65+
// General as well as algorithm-specific options.
66+
Params map[string]interface{} `json:"params,optional"`
67+
}
68+
69+
type PregelJobState string
70+
71+
const (
72+
// PregelJobStateNone - The Pregel run did not yet start.
73+
PregelJobStateNone PregelJobState = "none"
74+
// PregelJobStateLoading - The graph is loaded from the database into memory before the execution of the algorithm.
75+
PregelJobStateLoading PregelJobState = "loading"
76+
// PregelJobStateRunning - The algorithm is executing normally.
77+
PregelJobStateRunning PregelJobState = "running"
78+
// PregelJobStateStoring - The algorithm finished, but the results are still being written back into the collections. Occurs only if the store parameter is set to true.
79+
PregelJobStateStoring PregelJobState = "storing"
80+
// PregelJobStateDone - The execution is done. In version 3.7.1 and later, this means that storing is also done.
81+
// In earlier versions, the results may not be written back into the collections yet. This event is announced in the server log (requires at least info log level for the pregel log topic).
82+
PregelJobStateDone PregelJobState = "done"
83+
// PregelJobStateCanceled - The execution was permanently canceled, either by the user or by an error.
84+
PregelJobStateCanceled PregelJobState = "canceled"
85+
// PregelJobStateFatalError - The execution has failed and cannot recover.
86+
PregelJobStateFatalError PregelJobState = "fatal error"
87+
// PregelJobStateInError - The execution is in an error state. This can be caused by DB-Servers being not reachable or being non-responsive.
88+
// The execution might recover later, or switch to "canceled" if it was not able to recover successfully.
89+
PregelJobStateInError PregelJobState = "in error"
90+
// PregelJobStateRecovering - (currently unused): The execution is actively recovering and switches back to running if the recovery is successful.
91+
PregelJobStateRecovering PregelJobState = "recovering"
92+
)
93+
94+
type PregelJob struct {
95+
// The ID of the Pregel job, as a string.
96+
ID string `json:"id"`
97+
// The algorithm used by the job.
98+
Algorithm PregelAlgorithm `json:"algorithm,omitempty"`
99+
// The date and time when the job was created.
100+
Created time.Time `json:"created,omitempty"`
101+
// The date and time when the job results expire.
102+
// The expiration date is only meaningful for jobs that were completed, canceled or resulted in an error.
103+
// Such jobs are cleaned up by the garbage collection when they reach their expiration date/time.
104+
Started time.Time `json:"started,omitempty"`
105+
// The TTL (time to live) value for the job results, specified in seconds. The TTL is used to calculate the expiration date for the job’s results.
106+
TTL uint64 `json:"ttl,omitempty"`
107+
// The state of the execution.
108+
State PregelJobState `json:"state,omitempty"`
109+
// The number of global supersteps executed.
110+
Gss uint64 `json:"gss,omitempty"`
111+
// The total runtime of the execution up to now (if the execution is still ongoing).
112+
TotalRuntime float64 `json:"totalRuntime,omitempty"`
113+
// The startup runtime of the execution. The startup time includes the data loading time and can be substantial.
114+
StartupTime float64 `json:"startupTime,omitempty"`
115+
// The algorithm execution time. Is shown when the computation started.
116+
ComputationTime float64 `json:"computationTime,omitempty"`
117+
// The time for storing the results if the job includes results storage. Is shown when the storing started.
118+
StorageTime float64 `json:"storageTime,omitempty"`
119+
// Computation time of each global super step. Is shown when the computation started.
120+
GSSTimes []float64 `json:"gssTimes,omitempty"`
121+
// This attribute is used by Programmable Pregel Algorithms (air, experimental). The value is only populated once the algorithm has finished.
122+
Reports []map[string]interface{} `json:"reports,omitempty"`
123+
// The total number of vertices processed.
124+
VertexCount uint64 `json:"vertexCount,omitempty"`
125+
// The total number of edges processed.
126+
EdgeCount uint64 `json:"edgeCount,omitempty"`
127+
// UseMemoryMaps
128+
UseMemoryMaps *bool `json:"useMemoryMaps,omitempty"`
129+
// The Pregel run details.
130+
// Available from 3.10 arangod version.
131+
Detail *PregelRunDetails `json:"detail,omitempty"`
132+
}
133+
134+
// PregelRunDetails - The Pregel run details.
135+
// Available from 3.10 arangod version.
136+
type PregelRunDetails struct {
137+
// The aggregated details of the full Pregel run. The values are totals of all the DB-Server.
138+
AggregatedStatus *AggregatedStatus `json:"aggregatedStatus,omitempty"`
139+
// The details of the Pregel for every DB-Server. Each object key is a DB-Server ID, and each value is a nested object similar to the aggregatedStatus attribute.
140+
// In a single server deployment, there is only a single entry with an empty string as key.
141+
WorkerStatus map[string]*AggregatedStatus `json:"workerStatus,omitempty"`
142+
}
143+
144+
// AggregatedStatus The aggregated details of the full Pregel run. The values are totals of all the DB-Server.
145+
type AggregatedStatus struct {
146+
// The time at which the status was measured.
147+
TimeStamp time.Time `json:"timeStamp,omitempty"`
148+
// The status of the in memory graph.
149+
GraphStoreStatus *GraphStoreStatus `json:"graphStoreStatus,omitempty"`
150+
// Information about the global supersteps.
151+
AllGSSStatus *AllGSSStatus `json:"allGssStatus,omitempty"`
152+
}
153+
154+
// GraphStoreStatus The status of the in memory graph.
155+
type GraphStoreStatus struct {
156+
// The number of vertices that are loaded from the database into memory.
157+
VerticesLoaded uint64 `json:"verticesLoaded,omitempty"`
158+
// The number of edges that are loaded from the database into memory.
159+
EdgesLoaded uint64 `json:"edgesLoaded,omitempty"`
160+
// The number of bytes used in-memory for the loaded graph.
161+
MemoryBytesUsed uint64 `json:"memoryBytesUsed,omitempty"`
162+
// The number of vertices that are written back to the database after the Pregel computation finished. It is only set if the store parameter is set to true.
163+
VerticesStored uint64 `json:"verticesStored,omitempty"`
164+
}
165+
166+
// AllGSSStatus Information about the global supersteps.
167+
type AllGSSStatus struct {
168+
// A list of objects with details for each global superstep.
169+
Items []GSSStatus `json:"items,omitempty"`
170+
}
171+
172+
// GSSStatus Information about the global superstep
173+
type GSSStatus struct {
174+
// The number of vertices that have been processed in this step.
175+
VerticesProcessed uint64 `json:"verticesProcessed,omitempty"`
176+
// The number of messages sent in this step.
177+
MessagesSent uint64 `json:"messagesSent,omitempty"`
178+
// The number of messages received in this step.
179+
MessagesReceived uint64 `json:"messagesReceived,omitempty"`
180+
// The number of bytes used in memory for the messages in this step.
181+
MemoryBytesUsedForMessages uint64 `json:"memoryBytesUsedForMessages,omitempty"`
182+
}

database_pregel_impl.go

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2022 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
21+
package driver
22+
23+
import (
24+
"context"
25+
"path"
26+
"strings"
27+
)
28+
29+
func (d *database) StartJob(ctx context.Context, options PregelJobOptions) (string, error) {
30+
id := ""
31+
32+
req, err := d.conn.NewRequest("POST", path.Join(d.relPath(), "_api/control_pregel"))
33+
if err != nil {
34+
return id, WithStack(err)
35+
}
36+
if _, err := req.SetBody(options); err != nil {
37+
return id, WithStack(err)
38+
}
39+
40+
var rawResponse []byte
41+
ctx = WithRawResponse(ctx, &rawResponse)
42+
resp, err := d.conn.Do(ctx, req)
43+
if err != nil {
44+
return id, WithStack(err)
45+
}
46+
if err := resp.CheckStatus(200); err != nil {
47+
return id, WithStack(err)
48+
}
49+
50+
return strings.Trim(string(rawResponse), "\""), nil
51+
}
52+
53+
func (d *database) GetJob(ctx context.Context, id string) (*PregelJob, error) {
54+
escapedId := pathEscape(id)
55+
req, err := d.conn.NewRequest("GET", path.Join(d.relPath(), "_api/control_pregel", escapedId))
56+
if err != nil {
57+
return nil, WithStack(err)
58+
}
59+
resp, err := d.conn.Do(ctx, req)
60+
if err != nil {
61+
return nil, WithStack(err)
62+
}
63+
if err := resp.CheckStatus(200); err != nil {
64+
return nil, WithStack(err)
65+
}
66+
var data PregelJob
67+
if err := resp.ParseBody("", &data); err != nil {
68+
return nil, WithStack(err)
69+
}
70+
return &data, nil
71+
}
72+
73+
func (d *database) GetJobs(ctx context.Context) ([]*PregelJob, error) {
74+
req, err := d.conn.NewRequest("GET", path.Join(d.relPath(), "_api/control_pregel"))
75+
if err != nil {
76+
return nil, WithStack(err)
77+
}
78+
resp, err := d.conn.Do(ctx, req)
79+
if err != nil {
80+
return nil, WithStack(err)
81+
}
82+
if err := resp.CheckStatus(200); err != nil {
83+
return nil, WithStack(err)
84+
}
85+
86+
var data []*PregelJob
87+
responses, err := resp.ParseArrayBody()
88+
if err != nil {
89+
return nil, WithStack(err)
90+
}
91+
92+
for _, response := range responses {
93+
var job PregelJob
94+
if err := response.ParseBody("", &job); err != nil {
95+
return nil, WithStack(err)
96+
}
97+
data = append(data, &job)
98+
}
99+
return data, nil
100+
}
101+
102+
func (d *database) CancelJob(ctx context.Context, id string) error {
103+
escapedId := pathEscape(id)
104+
req, err := d.conn.NewRequest("GET", path.Join(d.relPath(), "_api/control_pregel", escapedId))
105+
if err != nil {
106+
return WithStack(err)
107+
}
108+
resp, err := d.conn.Do(ctx, req)
109+
if err != nil {
110+
return WithStack(err)
111+
}
112+
if err := resp.CheckStatus(200); err != nil {
113+
return WithStack(err)
114+
}
115+
return nil
116+
}

replication_impl.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434

3535
// Content of the create batch resp
3636
type batchMetadata struct {
37-
// Id of the batch
37+
// ID of the batch
3838
ID string `json:"id"`
3939
// Last Tick reported by the server
4040
LastTickInt Tick `json:"lastTick,omitempty"`

0 commit comments

Comments
 (0)