Skip to content

Commit

Permalink
Merge pull request #262 from tigergraph/GML-1890-specify-atomicity-mu…
Browse files Browse the repository at this point in the history
…ltiline-upserts

feat(atomic changes): set atomic level on batch upsert functions
  • Loading branch information
parkererickson-tg authored Oct 28, 2024
2 parents a3f3365 + 8609c02 commit a477e31
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 17 deletions.
21 changes: 18 additions & 3 deletions pyTigerGraph/pyTigerGraphEdge.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ def upsertEdge(self, sourceVertexType: str, sourceVertexId: str, edgeType: str,
return ret

def upsertEdges(self, sourceVertexType: str, edgeType: str, targetVertexType: str,
edges: list, vertexMustExist=False) -> int:
edges: list, vertexMustExist=False, atomic: bool = False) -> int:
"""Upserts multiple edges (of the same type).
Args:
Expand Down Expand Up @@ -523,6 +523,11 @@ def upsertEdges(self, sourceVertexType: str, edgeType: str, targetVertexType: st
```
#operation-codes .
For valid values of `<operator>` see https://docs.tigergraph.com/dev/restpp-api/built-in-endpoints
atomic:
The request is an atomic transaction. An atomic transaction means that updates to
the database contained in the request are all-or-nothing: either all changes are
successful, or none are successful. This uses the `gsql-atomic-level` header, and sets
the value to `atomic` if `True`, and `nonatomic` if `False`. Default is `False`.
Returns:
A single number of accepted (successfully upserted) edges (0 or positive integer).
Expand Down Expand Up @@ -552,7 +557,10 @@ def upsertEdges(self, sourceVertexType: str, edgeType: str, targetVertexType: st
edgeType=edgeType,
targetVertexType=targetVertexType,
edges=edges)
ret = self._req("POST", self.restppUrl + "/graph/" + self.graphname, data=data)[0][
header = {}
if atomic:
header = {"gsql-atomic-level": "atomic"}
ret = self._req("POST", self.restppUrl + "/graph/" + self.graphname, data=data, headers=header)[0][
"accepted_edges"]

data = {sourceVertexType: {}}
Expand Down Expand Up @@ -601,7 +609,8 @@ def upsertEdges(self, sourceVertexType: str, edgeType: str, targetVertexType: st

def upsertEdgeDataFrame(self, df: 'pd.DataFrame', sourceVertexType: str, edgeType: str,
targetVertexType: str, from_id: str = "", to_id: str = "",
attributes: dict = None, vertexMustExist: bool = False) -> int:
attributes: dict = None, vertexMustExist: bool = False,
atomic: bool = False) -> int:
"""Upserts edges from a Pandas DataFrame.
Args:
Expand All @@ -624,6 +633,11 @@ def upsertEdgeDataFrame(self, df: 'pd.DataFrame', sourceVertexType: str, edgeTyp
the dataframe and target is the attribute name on the edge. When omitted,
all columns would be upserted with their current names. In this case column names
must match the edges's attribute names.
atomic:
The request is an atomic transaction. An atomic transaction means that updates to
the database contained in the request are all-or-nothing: either all changes are
successful, or none are successful. This uses the `gsql-atomic-level` header, and sets
the value to `atomic` if `True`, and `nonatomic` if `False`. Default is `False`.
Returns:
The number of edges upserted.
Expand Down Expand Up @@ -656,6 +670,7 @@ def upsertEdgeDataFrame(self, df: 'pd.DataFrame', sourceVertexType: str, edgeTyp
targetVertexType,
json_up,
vertexMustExist=vertexMustExist,
atomic=atomic
)

if logger.level == logging.DEBUG:
Expand Down
3 changes: 2 additions & 1 deletion pyTigerGraph/pyTigerGraphSchema.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ def upsertData(self, data: Union[str, object], atomic: bool = False, ackAll: boo
atomic:
The request is an atomic transaction. An atomic transaction means that updates to
the database contained in the request are all-or-nothing: either all changes are
successful, or none are successful.
successful, or none are successful. This uses the `gsql-atomic-level` header, and sets
the value to `atomic` if `True`, and `nonatomic` if `False`. Default is `False`.
ackAll:
If `True`, the request will return after all GPE instances have acknowledged the
POST. Otherwise, the request will return immediately after RESTPP processes the POST.
Expand Down
22 changes: 18 additions & 4 deletions pyTigerGraph/pyTigerGraphVertex.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ def upsertVertex(self, vertexType: str, vertexId: str, attributes: dict = None)

return ret

def upsertVertices(self, vertexType: str, vertices: list) -> int:
def upsertVertices(self, vertexType: str, vertices: list, atomic: bool = False) -> int:
"""Upserts multiple vertices (of the same type).
See the description of ``upsertVertex`` for generic information.
Expand Down Expand Up @@ -273,6 +273,11 @@ def upsertVertices(self, vertexType: str, vertices: list) -> int:
----
For valid values of `<operator>` see xref:tigergraph-server:API:built-in-endpoints.adoc#_operation_codes[Operation codes].
atomic:
The request is an atomic transaction. An atomic transaction means that updates to
the database contained in the request are all-or-nothing: either all changes are
successful, or none are successful. This uses the `gsql-atomic-level` header, and sets
the value to `atomic` if `True`, and `nonatomic` if `False`. Default is `False`.
Returns:
A single number of accepted (successfully upserted) vertices (0 or positive integer).
Expand All @@ -284,6 +289,10 @@ def upsertVertices(self, vertexType: str, vertices: list) -> int:
logger.info("entry: upsertVertices")
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

headers = {}
if atomic:
headers["gsql-atomic-level"] = "atomic"

data = {}
for v in vertices:
Expand All @@ -292,7 +301,7 @@ def upsertVertices(self, vertexType: str, vertices: list) -> int:
data = json.dumps({"vertices": {vertexType: data}})

ret = self._req("POST", self.restppUrl + "/graph/" +
self.graphname, data=data)[0]["accepted_vertices"]
self.graphname, data=data, headers=headers)[0]["accepted_vertices"]

if logger.level == logging.DEBUG:
logger.debug("return: " + str(ret))
Expand All @@ -301,7 +310,7 @@ def upsertVertices(self, vertexType: str, vertices: list) -> int:
return ret

def upsertVertexDataFrame(self, df: 'pd.DataFrame', vertexType: str, v_id: bool = None,
attributes: dict = None) -> int:
attributes: dict = None, atomic: bool = False) -> int:
"""Upserts vertices from a Pandas DataFrame.
Args:
Expand All @@ -317,6 +326,11 @@ def upsertVertexDataFrame(self, df: 'pd.DataFrame', vertexType: str, v_id: bool
the dataframe and target is the attribute name in the graph vertex. When omitted,
all columns would be upserted with their current names. In this case column names
must match the vertex's attribute names.
atomic:
The request is an atomic transaction. An atomic transaction means that updates to
the database contained in the request are all-or-nothing: either all changes are
successful, or none are successful. This uses the `gsql-atomic-level` header, and sets
the value to `atomic` if `True`, and `nonatomic` if `False`. Default is `False`.
Returns:
The number of vertices upserted.
Expand All @@ -327,7 +341,7 @@ def upsertVertexDataFrame(self, df: 'pd.DataFrame', vertexType: str, v_id: bool

json_up = _prep_upsert_vertex_dataframe(
df=df, v_id=v_id, attributes=attributes)
ret = self.upsertVertices(vertexType=vertexType, vertices=json_up)
ret = self.upsertVertices(vertexType=vertexType, vertices=json_up, atomic=atomic)

if logger.level == logging.DEBUG:
logger.debug("return: " + str(ret))
Expand Down
23 changes: 19 additions & 4 deletions pyTigerGraph/pytgasync/pyTigerGraphEdge.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ async def upsertEdge(self, sourceVertexType: str, sourceVertexId: str, edgeType:
```
{"visits": (1482, "+"), "max_duration": (371, "max")}
```
For valid values of `<operator>` see https://docs.tigergraph.com/dev/restpp-api/built-in-endpoints#operation-codes .
For valid values of `<operator>` see https://docs.tigergraph.com/dev/restpp-api/built-in-endpoints#operation-codes.
Returns:
A single number of accepted (successfully upserted) edges (0 or 1).
Expand Down Expand Up @@ -467,7 +467,7 @@ async def upsertEdge(self, sourceVertexType: str, sourceVertexId: str, edgeType:
return ret

async def upsertEdges(self, sourceVertexType: str, edgeType: str, targetVertexType: str,
edges: list) -> int:
edges: list, atomic: bool = False) -> int:
"""Upserts multiple edges (of the same type).
Args:
Expand All @@ -494,6 +494,11 @@ async def upsertEdges(self, sourceVertexType: str, edgeType: str, targetVertexTy
]
```
For valid values of `<operator>` see https://docs.tigergraph.com/dev/restpp-api/built-in-endpoints#operation-codes .
atomic:
The request is an atomic transaction. An atomic transaction means that updates to
the database contained in the request are all-or-nothing: either all changes are
successful, or none are successful. This uses the `gsql-atomic-level` header, and sets
the value to `atomic` if `True`, and `nonatomic` if `False`. Default is `False`.
Returns:
A single number of accepted (successfully upserted) edges (0 or positive integer).
Expand Down Expand Up @@ -524,6 +529,11 @@ async def upsertEdges(self, sourceVertexType: str, edgeType: str, targetVertexTy
edgeType=edgeType,
targetVertexType=targetVertexType,
edges=edges)

headers = {}
if atomic:
headers["gsql-atomic-level"] = "atomic"

ret = await self._req("POST", self.restppUrl + "/graph/" + self.graphname, data=data)
ret = ret[0]["accepted_edges"]

Expand All @@ -535,7 +545,7 @@ async def upsertEdges(self, sourceVertexType: str, edgeType: str, targetVertexTy

async def upsertEdgeDataFrame(self, df: 'pd.DataFrame', sourceVertexType: str, edgeType: str,
targetVertexType: str, from_id: str = "", to_id: str = "",
attributes: dict = None) -> int:
attributes: dict = None, atomic: bool = False) -> int:
"""Upserts edges from a Pandas DataFrame.
Args:
Expand All @@ -558,6 +568,11 @@ async def upsertEdgeDataFrame(self, df: 'pd.DataFrame', sourceVertexType: str, e
the dataframe and target is the attribute name on the edge. When omitted,
all columns would be upserted with their current names. In this case column names
must match the edges's attribute names.
atomic:
The request is an atomic transaction. An atomic transaction means that updates to
the database contained in the request are all-or-nothing: either all changes are
successful, or none are successful. This uses the `gsql-atomic-level` header, and sets
the value to `atomic` if `True`, and `nonatomic` if `False`. Default is `False`.
Returns:
The number of edges upserted.
Expand All @@ -567,7 +582,7 @@ async def upsertEdgeDataFrame(self, df: 'pd.DataFrame', sourceVertexType: str, e
logger.debug("params: " + self._locals(locals()))

json_up = _prep_upsert_edge_dataframe(df, from_id, to_id, attributes)
ret = await self.upsertEdges(sourceVertexType, edgeType, targetVertexType, json_up)
ret = await self.upsertEdges(sourceVertexType, edgeType, targetVertexType, json_up, atomic=atomic)

if logger.level == logging.DEBUG:
logger.debug("return: " + str(ret))
Expand Down
3 changes: 2 additions & 1 deletion pyTigerGraph/pytgasync/pyTigerGraphSchema.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ async def upsertData(self, data: Union[str, object], atomic: bool = False, ackAl
atomic:
The request is an atomic transaction. An atomic transaction means that updates to
the database contained in the request are all-or-nothing: either all changes are
successful, or none are successful.
successful, or none are successful. This uses the `gsql-atomic-level` header, and sets
the value to `atomic` if `True`, and `nonatomic` if `False`.
ackAll:
If `True`, the request will return after all GPE instances have acknowledged the
POST. Otherwise, the request will return immediately after RESTPP processes the POST.
Expand Down
22 changes: 18 additions & 4 deletions pyTigerGraph/pytgasync/pyTigerGraphVertex.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ async def upsertVertex(self, vertexType: str, vertexId: str, attributes: dict =

return ret

async def upsertVertices(self, vertexType: str, vertices: list) -> int:
async def upsertVertices(self, vertexType: str, vertices: list, atomic: bool = False) -> int:
"""Upserts multiple vertices (of the same type).
See the description of ``upsertVertex`` for generic information.
Expand Down Expand Up @@ -280,6 +280,11 @@ async def upsertVertices(self, vertexType: str, vertices: list) -> int:
----
For valid values of `<operator>` see xref:tigergraph-server:API:built-in-endpoints.adoc#_operation_codes[Operation codes].
atomic:
The request is an atomic transaction. An atomic transaction means that updates to
the database contained in the request are all-or-nothing: either all changes are
successful, or none are successful. This uses the `gsql-atomic-level` header, and sets
the value to `atomic` if `True`, and `nonatomic` if `False`. Defaults to False.
Returns:
A single number of accepted (successfully upserted) vertices (0 or positive integer).
Expand All @@ -292,13 +297,17 @@ async def upsertVertices(self, vertexType: str, vertices: list) -> int:
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

headers = {}
if atomic:
headers["gsql-atomic-level"] = "atomic"

data = {}
for v in vertices:
vals = _upsert_attrs(v[1])
data[v[0]] = vals
data = json.dumps({"vertices": {vertexType: data}})

ret = await self._req("POST", self.restppUrl + "/graph/" + self.graphname, data=data)
ret = await self._req("POST", self.restppUrl + "/graph/" + self.graphname, data=data, headers=headers)
ret = ret[0]["accepted_vertices"]

if logger.level == logging.DEBUG:
Expand All @@ -308,7 +317,7 @@ async def upsertVertices(self, vertexType: str, vertices: list) -> int:
return ret

async def upsertVertexDataFrame(self, df: 'pd.DataFrame', vertexType: str, v_id: bool = None,
attributes: dict = None) -> int:
attributes: dict = None, atomic: bool = False) -> int:
"""Upserts vertices from a Pandas DataFrame.
Args:
Expand All @@ -324,6 +333,11 @@ async def upsertVertexDataFrame(self, df: 'pd.DataFrame', vertexType: str, v_id:
the dataframe and target is the attribute name in the graph vertex. When omitted,
all columns would be upserted with their current names. In this case column names
must match the vertex's attribute names.
atomic:
The request is an atomic transaction. An atomic transaction means that updates to
the database contained in the request are all-or-nothing: either all changes are
successful, or none are successful. This uses the `gsql-atomic-level` header, and sets
the value to `atomic` if `True`, and `nonatomic` if `False`. Defaults to False.
Returns:
The number of vertices upserted.
Expand All @@ -334,7 +348,7 @@ async def upsertVertexDataFrame(self, df: 'pd.DataFrame', vertexType: str, v_id:

json_up = _prep_upsert_vertex_dataframe(
df=df, v_id=v_id, attributes=attributes)
ret = await self.upsertVertices(vertexType=vertexType, vertices=json_up)
ret = await self.upsertVertices(vertexType=vertexType, vertices=json_up, atomic=atomic)

if logger.level == logging.DEBUG:
logger.debug("return: " + str(ret))
Expand Down

0 comments on commit a477e31

Please sign in to comment.