From 9b8fc1b94e0b872852a7a1705efbf71af8f20833 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin <45186596+chengbiao-jin@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:18:12 -0500 Subject: [PATCH 01/11] Update pyTigerGraphLoading.py Add function to support data loading from a string directly instead of a file. --- pyTigerGraph/pyTigerGraphLoading.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pyTigerGraph/pyTigerGraphLoading.py b/pyTigerGraph/pyTigerGraphLoading.py index 16272445..6b7d384f 100644 --- a/pyTigerGraph/pyTigerGraphLoading.py +++ b/pyTigerGraph/pyTigerGraphLoading.py @@ -49,8 +49,16 @@ def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: if logger.level == logging.DEBUG: logger.debug("params: " + self._locals(locals())) + data = open(filePath, 'rb').read() + return self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) + + def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str = None, + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: + logger.info("entry: runLoadingJobWithData") + if logger.level == logging.DEBUG: + logger.debug("params: " + self._locals(locals())) + try: - data = open(filePath, 'rb').read() params = { "tag": jobName, "filename": fileTag, From 0b725bde959bf3967a6869d6ce250726768fec21 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin <45186596+chengbiao-jin@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:42:45 -0500 Subject: [PATCH 02/11] Update logger --- pyTigerGraph/pyTigerGraphLoading.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pyTigerGraph/pyTigerGraphLoading.py b/pyTigerGraph/pyTigerGraphLoading.py index 6b7d384f..ed1ca70d 100644 --- a/pyTigerGraph/pyTigerGraphLoading.py +++ b/pyTigerGraph/pyTigerGraphLoading.py @@ -50,7 +50,11 @@ def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: logger.debug("params: " + self._locals(locals())) data = open(filePath, 'rb').read() - return self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) + res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) + + logger.info("exit: runLoadingJobWithData") + + return res def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str = None, eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: @@ -69,7 +73,7 @@ def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str params["eol"] = eol except OSError as ose: logger.error(ose.strerror) - logger.info("exit: runLoadingJobWithFile") + logger.info("exit: runLoadingJobWithData") return None # TODO Should throw exception instead? @@ -79,7 +83,7 @@ def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str if logger.level == logging.DEBUG: logger.debug("return: " + str(res)) - logger.info("exit: runLoadingJobWithFile") + logger.info("exit: runLoadingJobWithData") return res From cec2ca344aeb4b0a50d127f77a76403bb30dc009 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin <45186596+chengbiao-jin@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:44:21 -0500 Subject: [PATCH 03/11] Update logger --- pyTigerGraph/pyTigerGraphLoading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyTigerGraph/pyTigerGraphLoading.py b/pyTigerGraph/pyTigerGraphLoading.py index ed1ca70d..9c91a12c 100644 --- a/pyTigerGraph/pyTigerGraphLoading.py +++ b/pyTigerGraph/pyTigerGraphLoading.py @@ -52,7 +52,7 @@ def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: data = open(filePath, 'rb').read() res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) - logger.info("exit: runLoadingJobWithData") + logger.info("exit: runLoadingJobWithFile") return res From b4a2f393a8674d8669f2d40d53f80752a4762f42 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin <45186596+chengbiao-jin@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:52:12 -0500 Subject: [PATCH 04/11] Update Doc --- pyTigerGraph/pyTigerGraphLoading.py | 32 ++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/pyTigerGraph/pyTigerGraphLoading.py b/pyTigerGraph/pyTigerGraphLoading.py index 9c91a12c..877a89f4 100644 --- a/pyTigerGraph/pyTigerGraphLoading.py +++ b/pyTigerGraph/pyTigerGraphLoading.py @@ -58,7 +58,37 @@ def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str = None, eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: - logger.info("entry: runLoadingJobWithData") + """Execute a loading job with the given data string. + + The data string will be posted to the TigerGraph server and the value of the appropriate + FILENAME definition will be updated to point to the data received. + + NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to + load the file correctly. Remove the header from the data file before using this function. + + Args: + data: + The data string to be loaded. + fileTag: + The name of file variable in the loading job (DEFINE FILENAME ). + jobName: + The name of the loading job. + sep: + Data value separator. If your data is JSON, you do not need to specify this + parameter. The default separator is a comma `,`. + eol: + End-of-line character. Only one or two characters are allowed, except for the + special case `\\r\\n`. The default value is `\\n` + timeout: + Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting. + sizeLimit: + Maximum size for input file in bytes. + + Endpoint: + - `POST /ddl/{graph_name}` + See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job] + """ + logger.info("entry: runLoadingJobWithData") if logger.level == logging.DEBUG: logger.debug("params: " + self._locals(locals())) From ff76d245b7c05d7d77627b5a56aee0b8420f19e4 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin <45186596+chengbiao-jin@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:53:14 -0500 Subject: [PATCH 05/11] Update pyTigerGraphLoading.py --- pyTigerGraph/pyTigerGraphLoading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyTigerGraph/pyTigerGraphLoading.py b/pyTigerGraph/pyTigerGraphLoading.py index 877a89f4..996575d7 100644 --- a/pyTigerGraph/pyTigerGraphLoading.py +++ b/pyTigerGraph/pyTigerGraphLoading.py @@ -88,7 +88,7 @@ def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str - `POST /ddl/{graph_name}` See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job] """ - logger.info("entry: runLoadingJobWithData") + logger.info("entry: runLoadingJobWithData") if logger.level == logging.DEBUG: logger.debug("params: " + self._locals(locals())) From 5ada4060d631eca6283576345dd0d5691f7d19e9 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin <45186596+chengbiao-jin@users.noreply.github.com> Date: Wed, 23 Oct 2024 18:20:25 -0500 Subject: [PATCH 06/11] Add runLoadingJobWithDF function --- pyTigerGraph/pyTigerGraphLoading.py | 54 ++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/pyTigerGraph/pyTigerGraphLoading.py b/pyTigerGraph/pyTigerGraphLoading.py index 996575d7..67b10e1a 100644 --- a/pyTigerGraph/pyTigerGraphLoading.py +++ b/pyTigerGraph/pyTigerGraphLoading.py @@ -5,7 +5,10 @@ """ import logging import warnings -from typing import Union +from typing import TYPE_CHECKING, Union +if TYPE_CHECKING: + import pandas as pd + from pyTigerGraph.pyTigerGraphBase import pyTigerGraphBase logger = logging.getLogger(__name__) @@ -13,6 +16,55 @@ class pyTigerGraphLoading(pyTigerGraphBase): + def runLoadingJobWithDF(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None, + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]: + """Execute a loading job with the given pandas DataFrame with optional column list. + + The data string will be posted to the TigerGraph server and the value of the appropriate + FILENAME definition will be updated to point to the data received. + + NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to + load the file correctly. Remove the header from the data file before using this function. + + Args: + df: + The pandas DateFrame data structure to be loaded. + fileTag: + The name of file variable in the loading job (DEFINE FILENAME ). + jobName: + The name of the loading job. + sep: + Data value separator. If your data is JSON, you do not need to specify this + parameter. The default separator is a comma `,`. + eol: + End-of-line character. Only one or two characters are allowed, except for the + special case `\\r\\n`. The default value is `\\n` + timeout: + Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting. + sizeLimit: + Maximum size for input file in bytes. + columns: + The ordered pandas DataFrame columns to be uploaded. + + Endpoint: + - `POST /ddl/{graph_name}` + See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job] + """ + logger.info("entry: runLoadingJobWithDF") + if logger.level == logging.DEBUG: + logger.debug("params: " + self._locals(locals())) + + if columns is None: + data = df.to_csv(sep = '|', header=False) + else: + data = df.to_csv(columns = columns, sep = '|', header=False) + + res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) + + logger.info("exit: runLoadingJobWithDF") + + return res + def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: str = None, eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: """Execute a loading job with the referenced file. From 4f3e052b51c6ef1ac1d2d54da7ce322d030bbfb0 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin <45186596+chengbiao-jin@users.noreply.github.com> Date: Wed, 23 Oct 2024 18:52:58 -0500 Subject: [PATCH 07/11] Update separator --- pyTigerGraph/pyTigerGraphLoading.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyTigerGraph/pyTigerGraphLoading.py b/pyTigerGraph/pyTigerGraphLoading.py index 67b10e1a..0e6fdd1d 100644 --- a/pyTigerGraph/pyTigerGraphLoading.py +++ b/pyTigerGraph/pyTigerGraphLoading.py @@ -55,9 +55,9 @@ def runLoadingJobWithDF(self, df: 'pd.DataFrame', fileTag: str, jobName: str, se logger.debug("params: " + self._locals(locals())) if columns is None: - data = df.to_csv(sep = '|', header=False) + data = df.to_csv(sep = sep, header=False) else: - data = df.to_csv(columns = columns, sep = '|', header=False) + data = df.to_csv(columns = columns, sep = sep, header=False) res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) From 696c52c05dfe39abc7f50c01207845e65abb5a48 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin <45186596+chengbiao-jin@users.noreply.github.com> Date: Thu, 24 Oct 2024 12:58:22 -0500 Subject: [PATCH 08/11] Revise uploadDF function name --- pyTigerGraph/pyTigerGraphLoading.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyTigerGraph/pyTigerGraphLoading.py b/pyTigerGraph/pyTigerGraphLoading.py index 0e6fdd1d..d9fa1c11 100644 --- a/pyTigerGraph/pyTigerGraphLoading.py +++ b/pyTigerGraph/pyTigerGraphLoading.py @@ -16,7 +16,7 @@ class pyTigerGraphLoading(pyTigerGraphBase): - def runLoadingJobWithDF(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None, + def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None, eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]: """Execute a loading job with the given pandas DataFrame with optional column list. @@ -50,7 +50,7 @@ def runLoadingJobWithDF(self, df: 'pd.DataFrame', fileTag: str, jobName: str, se - `POST /ddl/{graph_name}` See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job] """ - logger.info("entry: runLoadingJobWithDF") + logger.info("entry: runLoadingJobWithDataFrame") if logger.level == logging.DEBUG: logger.debug("params: " + self._locals(locals())) @@ -61,7 +61,7 @@ def runLoadingJobWithDF(self, df: 'pd.DataFrame', fileTag: str, jobName: str, se res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) - logger.info("exit: runLoadingJobWithDF") + logger.info("exit: runLoadingJobWithDataFrame") return res From 0decc2e20170161f030838d20a0cfc259c019020 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin Date: Tue, 29 Oct 2024 22:46:09 +0000 Subject: [PATCH 09/11] support async loading --- pyTigerGraph/pytgasync/pyTigerGraphLoading.py | 122 ++++++++++++++++-- 1 file changed, 112 insertions(+), 10 deletions(-) diff --git a/pyTigerGraph/pytgasync/pyTigerGraphLoading.py b/pyTigerGraph/pytgasync/pyTigerGraphLoading.py index 15da68a7..a4ad1e33 100644 --- a/pyTigerGraph/pytgasync/pyTigerGraphLoading.py +++ b/pyTigerGraph/pytgasync/pyTigerGraphLoading.py @@ -6,7 +6,9 @@ import logging import warnings -from typing import Union +from typing import TYPE_CHECKING, Union +if TYPE_CHECKING: + import pandas as pd from pyTigerGraph.common.loading import _prep_run_loading_job_with_file from pyTigerGraph.pytgasync.pyTigerGraphBase import AsyncPyTigerGraphBase @@ -14,10 +16,59 @@ logger = logging.getLogger(__name__) -class AsyncPyTigerGraphLoading(AsyncPyTigerGraphBase): +class pyTigerGraphLoading(pyTigerGraphBase): + + async def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None, + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]: + """Execute a loading job with the given pandas DataFrame with optional column list. + + The data string will be posted to the TigerGraph server and the value of the appropriate + FILENAME definition will be updated to point to the data received. + + NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to + load the file correctly. Remove the header from the data file before using this function. + + Args: + df: + The pandas DateFrame data structure to be loaded. + fileTag: + The name of file variable in the loading job (DEFINE FILENAME ). + jobName: + The name of the loading job. + sep: + Data value separator. If your data is JSON, you do not need to specify this + parameter. The default separator is a comma `,`. + eol: + End-of-line character. Only one or two characters are allowed, except for the + special case `\\r\\n`. The default value is `\\n` + timeout: + Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting. + sizeLimit: + Maximum size for input file in bytes. + columns: + The ordered pandas DataFrame columns to be uploaded. + + Endpoint: + - `POST /ddl/{graph_name}` + See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job] + """ + logger.info("entry: runLoadingJobWithDataFrame") + if logger.level == logging.DEBUG: + logger.debug("params: " + self._locals(locals())) + + if columns is None: + data = df.to_csv(sep = sep, header=False) + else: + data = df.to_csv(columns = columns, sep = sep, header=False) + + res = await self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) + + logger.info("exit: runLoadingJobWithDataFrame") + + return res async def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: str = None, - eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: """Execute a loading job with the referenced file. The file will first be uploaded to the TigerGraph server and the value of the appropriate @@ -52,24 +103,75 @@ async def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, if logger.level == logging.DEBUG: logger.debug("params: " + self._locals(locals())) - data, params = _prep_run_loading_job_with_file( - filePath, jobName, fileTag, sep, eol) + data = _prep_run_loading_job_with_file(filePath) + res = await self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) + + logger.info("exit: runLoadingJobWithFile") + + return res + + async def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str = None, + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: + """Execute a loading job with the given data string. + + The data string will be posted to the TigerGraph server and the value of the appropriate + FILENAME definition will be updated to point to the data received. + + NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to + load the file correctly. Remove the header from the data file before using this function. + + Args: + data: + The data string to be loaded. + fileTag: + The name of file variable in the loading job (DEFINE FILENAME ). + jobName: + The name of the loading job. + sep: + Data value separator. If your data is JSON, you do not need to specify this + parameter. The default separator is a comma `,`. + eol: + End-of-line character. Only one or two characters are allowed, except for the + special case `\\r\\n`. The default value is `\\n` + timeout: + Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting. + sizeLimit: + Maximum size for input file in bytes. + + Endpoint: + - `POST /ddl/{graph_name}` + See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job] + """ + logger.info("entry: runLoadingJobWithData") + if logger.level == logging.DEBUG: + logger.debug("params: " + self._locals(locals())) - if not data and not params: - # failed to read file + if not data or not jobName or not fileTag: + # invalid inputs + logger.error("Invalid data or params") + logger.info("exit: runLoadingJobWithData") return None + params = { + "tag": jobName, + "filename": fileTag, + } + if sep is not None: + params["sep"] = sep + if eol is not None: + params["eol"] = eol + res = await self._req("POST", self.restppUrl + "/ddl/" + self.graphname, params=params, data=data, - headers={"RESPONSE-LIMIT": str(sizeLimit), "GSQL-TIMEOUT": str(timeout)}) + headers={"RESPONSE-LIMIT": str(sizeLimit), "GSQL-TIMEOUT": str(timeout)}) if logger.level == logging.DEBUG: logger.debug("return: " + str(res)) - logger.info("exit: runLoadingJobWithFile") + logger.info("exit: runLoadingJobWithData") return res async def uploadFile(self, filePath, fileTag, jobName="", sep=None, eol=None, timeout=16000, - sizeLimit=128000000) -> dict: + sizeLimit=128000000) -> dict: """DEPRECATED Use `runLoadingJobWithFile()` instead. From bb062ec35b9d1743fc827024db37f3da1169f128 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin Date: Tue, 29 Oct 2024 22:48:12 +0000 Subject: [PATCH 10/11] support async loading --- pyTigerGraph/pytgasync/pyTigerGraphLoading.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyTigerGraph/pytgasync/pyTigerGraphLoading.py b/pyTigerGraph/pytgasync/pyTigerGraphLoading.py index a4ad1e33..67455616 100644 --- a/pyTigerGraph/pytgasync/pyTigerGraphLoading.py +++ b/pyTigerGraph/pytgasync/pyTigerGraphLoading.py @@ -16,7 +16,7 @@ logger = logging.getLogger(__name__) -class pyTigerGraphLoading(pyTigerGraphBase): +class AsyncPyTigerGraphLoading(AsyncPyTigerGraphBase): async def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None, eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]: From 7cb8aeda37b8f82bd4feed71b9e3906c287bb924 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin Date: Tue, 29 Oct 2024 22:54:22 +0000 Subject: [PATCH 11/11] update format --- pyTigerGraph/pyTigerGraphLoading.py | 10 +++++----- pyTigerGraph/pytgasync/pyTigerGraphLoading.py | 16 ++++++++-------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/pyTigerGraph/pyTigerGraphLoading.py b/pyTigerGraph/pyTigerGraphLoading.py index d170799b..49a28736 100644 --- a/pyTigerGraph/pyTigerGraphLoading.py +++ b/pyTigerGraph/pyTigerGraphLoading.py @@ -19,7 +19,7 @@ class pyTigerGraphLoading(pyTigerGraphBase): def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None, - eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]: + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]: """Execute a loading job with the given pandas DataFrame with optional column list. The data string will be posted to the TigerGraph server and the value of the appropriate @@ -62,9 +62,9 @@ def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, jobName: data = df.to_csv(columns = columns, sep = sep, header=False) res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) - + logger.info("exit: runLoadingJobWithDataFrame") - + return res def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: str = None, @@ -107,11 +107,11 @@ def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) logger.info("exit: runLoadingJobWithFile") - + return res def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str = None, - eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: """Execute a loading job with the given data string. The data string will be posted to the TigerGraph server and the value of the appropriate diff --git a/pyTigerGraph/pytgasync/pyTigerGraphLoading.py b/pyTigerGraph/pytgasync/pyTigerGraphLoading.py index 67455616..72c196c0 100644 --- a/pyTigerGraph/pytgasync/pyTigerGraphLoading.py +++ b/pyTigerGraph/pytgasync/pyTigerGraphLoading.py @@ -19,7 +19,7 @@ class AsyncPyTigerGraphLoading(AsyncPyTigerGraphBase): async def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None, - eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]: + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]: """Execute a loading job with the given pandas DataFrame with optional column list. The data string will be posted to the TigerGraph server and the value of the appropriate @@ -62,13 +62,13 @@ async def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, job data = df.to_csv(columns = columns, sep = sep, header=False) res = await self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) - + logger.info("exit: runLoadingJobWithDataFrame") - + return res async def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: str = None, - eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: """Execute a loading job with the referenced file. The file will first be uploaded to the TigerGraph server and the value of the appropriate @@ -107,11 +107,11 @@ async def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, res = await self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) logger.info("exit: runLoadingJobWithFile") - + return res async def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str = None, - eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: """Execute a loading job with the given data string. The data string will be posted to the TigerGraph server and the value of the appropriate @@ -162,7 +162,7 @@ async def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep params["eol"] = eol res = await self._req("POST", self.restppUrl + "/ddl/" + self.graphname, params=params, data=data, - headers={"RESPONSE-LIMIT": str(sizeLimit), "GSQL-TIMEOUT": str(timeout)}) + headers={"RESPONSE-LIMIT": str(sizeLimit), "GSQL-TIMEOUT": str(timeout)}) if logger.level == logging.DEBUG: logger.debug("return: " + str(res)) @@ -171,7 +171,7 @@ async def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep return res async def uploadFile(self, filePath, fileTag, jobName="", sep=None, eol=None, timeout=16000, - sizeLimit=128000000) -> dict: + sizeLimit=128000000) -> dict: """DEPRECATED Use `runLoadingJobWithFile()` instead.