diff --git a/pyTigerGraph/common/loading.py b/pyTigerGraph/common/loading.py index f02c25a2..aba3eb2b 100644 --- a/pyTigerGraph/common/loading.py +++ b/pyTigerGraph/common/loading.py @@ -8,22 +8,14 @@ logger = logging.getLogger(__name__) -def _prep_run_loading_job_with_file(filePath, jobName, fileTag, sep, eol): +def _prep_run_loading_job_with_file(filePath): '''read file contents for runLoadingJobWithFile()''' try: data = open(filePath, 'rb').read() - params = { - "tag": jobName, - "filename": fileTag, - } - if sep is not None: - params["sep"] = sep - if eol is not None: - params["eol"] = eol - return data, params + return data except OSError as ose: logger.error(ose.strerror) logger.info("exit: runLoadingJobWithFile") - return None, None + return None # TODO Should throw exception instead? diff --git a/pyTigerGraph/pyTigerGraphLoading.py b/pyTigerGraph/pyTigerGraphLoading.py index dce96690..49a28736 100644 --- a/pyTigerGraph/pyTigerGraphLoading.py +++ b/pyTigerGraph/pyTigerGraphLoading.py @@ -6,10 +6,11 @@ import logging import warnings -from typing import Union +from typing import TYPE_CHECKING, Union +if TYPE_CHECKING: + import pandas as pd from pyTigerGraph.common.loading import _prep_run_loading_job_with_file - from pyTigerGraph.pyTigerGraphBase import pyTigerGraphBase logger = logging.getLogger(__name__) @@ -17,6 +18,55 @@ class pyTigerGraphLoading(pyTigerGraphBase): + def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None, + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]: + """Execute a loading job with the given pandas DataFrame with optional column list. + + The data string will be posted to the TigerGraph server and the value of the appropriate + FILENAME definition will be updated to point to the data received. + + NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to + load the file correctly. Remove the header from the data file before using this function. + + Args: + df: + The pandas DateFrame data structure to be loaded. + fileTag: + The name of file variable in the loading job (DEFINE FILENAME ). + jobName: + The name of the loading job. + sep: + Data value separator. If your data is JSON, you do not need to specify this + parameter. The default separator is a comma `,`. + eol: + End-of-line character. Only one or two characters are allowed, except for the + special case `\\r\\n`. The default value is `\\n` + timeout: + Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting. + sizeLimit: + Maximum size for input file in bytes. + columns: + The ordered pandas DataFrame columns to be uploaded. + + Endpoint: + - `POST /ddl/{graph_name}` + See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job] + """ + logger.info("entry: runLoadingJobWithDataFrame") + if logger.level == logging.DEBUG: + logger.debug("params: " + self._locals(locals())) + + if columns is None: + data = df.to_csv(sep = sep, header=False) + else: + data = df.to_csv(columns = columns, sep = sep, header=False) + + res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) + + logger.info("exit: runLoadingJobWithDataFrame") + + return res + def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: str = None, eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: """Execute a loading job with the referenced file. @@ -53,19 +103,70 @@ def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: if logger.level == logging.DEBUG: logger.debug("params: " + self._locals(locals())) - data, params = _prep_run_loading_job_with_file( - filePath, jobName, fileTag, sep, eol) + data = _prep_run_loading_job_with_file(filePath) + res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) - if not data and not params: - # failed to read file + logger.info("exit: runLoadingJobWithFile") + + return res + + def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str = None, + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: + """Execute a loading job with the given data string. + + The data string will be posted to the TigerGraph server and the value of the appropriate + FILENAME definition will be updated to point to the data received. + + NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to + load the file correctly. Remove the header from the data file before using this function. + + Args: + data: + The data string to be loaded. + fileTag: + The name of file variable in the loading job (DEFINE FILENAME ). + jobName: + The name of the loading job. + sep: + Data value separator. If your data is JSON, you do not need to specify this + parameter. The default separator is a comma `,`. + eol: + End-of-line character. Only one or two characters are allowed, except for the + special case `\\r\\n`. The default value is `\\n` + timeout: + Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting. + sizeLimit: + Maximum size for input file in bytes. + + Endpoint: + - `POST /ddl/{graph_name}` + See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job] + """ + logger.info("entry: runLoadingJobWithData") + if logger.level == logging.DEBUG: + logger.debug("params: " + self._locals(locals())) + + if not data or not jobName or not fileTag: + # invalid inputs + logger.error("Invalid data or params") + logger.info("exit: runLoadingJobWithData") return None + params = { + "tag": jobName, + "filename": fileTag, + } + if sep is not None: + params["sep"] = sep + if eol is not None: + params["eol"] = eol + res = self._req("POST", self.restppUrl + "/ddl/" + self.graphname, params=params, data=data, headers={"RESPONSE-LIMIT": str(sizeLimit), "GSQL-TIMEOUT": str(timeout)}) if logger.level == logging.DEBUG: logger.debug("return: " + str(res)) - logger.info("exit: runLoadingJobWithFile") + logger.info("exit: runLoadingJobWithData") return res diff --git a/pyTigerGraph/pytgasync/pyTigerGraphLoading.py b/pyTigerGraph/pytgasync/pyTigerGraphLoading.py index 15da68a7..72c196c0 100644 --- a/pyTigerGraph/pytgasync/pyTigerGraphLoading.py +++ b/pyTigerGraph/pytgasync/pyTigerGraphLoading.py @@ -6,7 +6,9 @@ import logging import warnings -from typing import Union +from typing import TYPE_CHECKING, Union +if TYPE_CHECKING: + import pandas as pd from pyTigerGraph.common.loading import _prep_run_loading_job_with_file from pyTigerGraph.pytgasync.pyTigerGraphBase import AsyncPyTigerGraphBase @@ -16,6 +18,55 @@ class AsyncPyTigerGraphLoading(AsyncPyTigerGraphBase): + async def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None, + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]: + """Execute a loading job with the given pandas DataFrame with optional column list. + + The data string will be posted to the TigerGraph server and the value of the appropriate + FILENAME definition will be updated to point to the data received. + + NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to + load the file correctly. Remove the header from the data file before using this function. + + Args: + df: + The pandas DateFrame data structure to be loaded. + fileTag: + The name of file variable in the loading job (DEFINE FILENAME ). + jobName: + The name of the loading job. + sep: + Data value separator. If your data is JSON, you do not need to specify this + parameter. The default separator is a comma `,`. + eol: + End-of-line character. Only one or two characters are allowed, except for the + special case `\\r\\n`. The default value is `\\n` + timeout: + Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting. + sizeLimit: + Maximum size for input file in bytes. + columns: + The ordered pandas DataFrame columns to be uploaded. + + Endpoint: + - `POST /ddl/{graph_name}` + See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job] + """ + logger.info("entry: runLoadingJobWithDataFrame") + if logger.level == logging.DEBUG: + logger.debug("params: " + self._locals(locals())) + + if columns is None: + data = df.to_csv(sep = sep, header=False) + else: + data = df.to_csv(columns = columns, sep = sep, header=False) + + res = await self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) + + logger.info("exit: runLoadingJobWithDataFrame") + + return res + async def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: str = None, eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: """Execute a loading job with the referenced file. @@ -52,19 +103,70 @@ async def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, if logger.level == logging.DEBUG: logger.debug("params: " + self._locals(locals())) - data, params = _prep_run_loading_job_with_file( - filePath, jobName, fileTag, sep, eol) + data = _prep_run_loading_job_with_file(filePath) + res = await self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit) + + logger.info("exit: runLoadingJobWithFile") + + return res + + async def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str = None, + eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]: + """Execute a loading job with the given data string. + + The data string will be posted to the TigerGraph server and the value of the appropriate + FILENAME definition will be updated to point to the data received. + + NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to + load the file correctly. Remove the header from the data file before using this function. + + Args: + data: + The data string to be loaded. + fileTag: + The name of file variable in the loading job (DEFINE FILENAME ). + jobName: + The name of the loading job. + sep: + Data value separator. If your data is JSON, you do not need to specify this + parameter. The default separator is a comma `,`. + eol: + End-of-line character. Only one or two characters are allowed, except for the + special case `\\r\\n`. The default value is `\\n` + timeout: + Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting. + sizeLimit: + Maximum size for input file in bytes. - if not data and not params: - # failed to read file + Endpoint: + - `POST /ddl/{graph_name}` + See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job] + """ + logger.info("entry: runLoadingJobWithData") + if logger.level == logging.DEBUG: + logger.debug("params: " + self._locals(locals())) + + if not data or not jobName or not fileTag: + # invalid inputs + logger.error("Invalid data or params") + logger.info("exit: runLoadingJobWithData") return None + params = { + "tag": jobName, + "filename": fileTag, + } + if sep is not None: + params["sep"] = sep + if eol is not None: + params["eol"] = eol + res = await self._req("POST", self.restppUrl + "/ddl/" + self.graphname, params=params, data=data, headers={"RESPONSE-LIMIT": str(sizeLimit), "GSQL-TIMEOUT": str(timeout)}) if logger.level == logging.DEBUG: logger.debug("return: " + str(res)) - logger.info("exit: runLoadingJobWithFile") + logger.info("exit: runLoadingJobWithData") return res