Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update pyTigerGraphLoading.py to add support on direct data loading #260

Merged
merged 12 commits into from
Oct 30, 2024
14 changes: 3 additions & 11 deletions pyTigerGraph/common/loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,14 @@

logger = logging.getLogger(__name__)

def _prep_run_loading_job_with_file(filePath, jobName, fileTag, sep, eol):
def _prep_run_loading_job_with_file(filePath):
'''read file contents for runLoadingJobWithFile()'''
try:
data = open(filePath, 'rb').read()
params = {
"tag": jobName,
"filename": fileTag,
}
if sep is not None:
params["sep"] = sep
if eol is not None:
params["eol"] = eol
return data, params
return data
except OSError as ose:
logger.error(ose.strerror)
logger.info("exit: runLoadingJobWithFile")

return None, None
return None
# TODO Should throw exception instead?
115 changes: 108 additions & 7 deletions pyTigerGraph/pyTigerGraphLoading.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,67 @@
import logging
import warnings

from typing import Union
from typing import TYPE_CHECKING, Union
if TYPE_CHECKING:
import pandas as pd

from pyTigerGraph.common.loading import _prep_run_loading_job_with_file

from pyTigerGraph.pyTigerGraphBase import pyTigerGraphBase

logger = logging.getLogger(__name__)


class pyTigerGraphLoading(pyTigerGraphBase):

def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None,
eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]:
"""Execute a loading job with the given pandas DataFrame with optional column list.

The data string will be posted to the TigerGraph server and the value of the appropriate
FILENAME definition will be updated to point to the data received.

NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to
load the file correctly. Remove the header from the data file before using this function.

Args:
df:
The pandas DateFrame data structure to be loaded.
fileTag:
The name of file variable in the loading job (DEFINE FILENAME <fileTag>).
jobName:
The name of the loading job.
sep:
Data value separator. If your data is JSON, you do not need to specify this
parameter. The default separator is a comma `,`.
eol:
End-of-line character. Only one or two characters are allowed, except for the
special case `\\r\\n`. The default value is `\\n`
timeout:
Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting.
sizeLimit:
Maximum size for input file in bytes.
columns:
The ordered pandas DataFrame columns to be uploaded.

Endpoint:
- `POST /ddl/{graph_name}`
See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job]
"""
logger.info("entry: runLoadingJobWithDataFrame")
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

if columns is None:
data = df.to_csv(sep = sep, header=False)
else:
data = df.to_csv(columns = columns, sep = sep, header=False)

res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit)

logger.info("exit: runLoadingJobWithDataFrame")

return res

def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: str = None,
eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]:
"""Execute a loading job with the referenced file.
Expand Down Expand Up @@ -53,19 +103,70 @@ def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep:
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

data, params = _prep_run_loading_job_with_file(
filePath, jobName, fileTag, sep, eol)
data = _prep_run_loading_job_with_file(filePath)
res = self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit)

if not data and not params:
# failed to read file
logger.info("exit: runLoadingJobWithFile")

return res

def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str = None,
eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]:
"""Execute a loading job with the given data string.

The data string will be posted to the TigerGraph server and the value of the appropriate
FILENAME definition will be updated to point to the data received.

NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to
load the file correctly. Remove the header from the data file before using this function.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment is confusing, same for runLoadingJobWithFile

So header should be removed before calling these two functions. If loading job still has Using header=true, will the first line be ignored?

Copy link
Collaborator Author

@chengbiao-jin chengbiao-jin Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original function does not support it hence I did not make any change on it yet.

Actually I'd prefer to support HEADER=true in these 2 functions hence user can provide the parameters according to the loading job. @parkererickson-tg do you have any background information on why it might not loading correctly with HEADER specified?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a long-time bug in the ddl system

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

header=False will be required in df.to_csv() in this case.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, make it explicit that USING HEADER=false in loading job definition.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HEADER=false is actually the default behavior

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I mean user should not set USING HEADER=true in loading job in this case? otherwise they will lose 1 row?


Args:
data:
The data string to be loaded.
fileTag:
The name of file variable in the loading job (DEFINE FILENAME <fileTag>).
jobName:
The name of the loading job.
sep:
Data value separator. If your data is JSON, you do not need to specify this
parameter. The default separator is a comma `,`.
eol:
End-of-line character. Only one or two characters are allowed, except for the
special case `\\r\\n`. The default value is `\\n`
timeout:
Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting.
sizeLimit:
Maximum size for input file in bytes.

Endpoint:
- `POST /ddl/{graph_name}`
See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job]
"""
logger.info("entry: runLoadingJobWithData")
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

if not data or not jobName or not fileTag:
# invalid inputs
logger.error("Invalid data or params")
logger.info("exit: runLoadingJobWithData")
return None

params = {
"tag": jobName,
"filename": fileTag,
}
if sep is not None:
params["sep"] = sep
if eol is not None:
params["eol"] = eol

res = self._req("POST", self.restppUrl + "/ddl/" + self.graphname, params=params, data=data,
headers={"RESPONSE-LIMIT": str(sizeLimit), "GSQL-TIMEOUT": str(timeout)})

if logger.level == logging.DEBUG:
logger.debug("return: " + str(res))
logger.info("exit: runLoadingJobWithFile")
logger.info("exit: runLoadingJobWithData")

return res

Expand Down
114 changes: 108 additions & 6 deletions pyTigerGraph/pytgasync/pyTigerGraphLoading.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import logging
import warnings

from typing import Union
from typing import TYPE_CHECKING, Union
if TYPE_CHECKING:
import pandas as pd

from pyTigerGraph.common.loading import _prep_run_loading_job_with_file
from pyTigerGraph.pytgasync.pyTigerGraphBase import AsyncPyTigerGraphBase
Expand All @@ -16,6 +18,55 @@

class AsyncPyTigerGraphLoading(AsyncPyTigerGraphBase):

async def runLoadingJobWithDataFrame(self, df: 'pd.DataFrame', fileTag: str, jobName: str, sep: str = None,
eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000, columns: list = None) -> Union[dict, None]:
"""Execute a loading job with the given pandas DataFrame with optional column list.

The data string will be posted to the TigerGraph server and the value of the appropriate
FILENAME definition will be updated to point to the data received.

NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to
load the file correctly. Remove the header from the data file before using this function.

Args:
df:
The pandas DateFrame data structure to be loaded.
fileTag:
The name of file variable in the loading job (DEFINE FILENAME <fileTag>).
jobName:
The name of the loading job.
sep:
Data value separator. If your data is JSON, you do not need to specify this
parameter. The default separator is a comma `,`.
eol:
End-of-line character. Only one or two characters are allowed, except for the
special case `\\r\\n`. The default value is `\\n`
timeout:
Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting.
sizeLimit:
Maximum size for input file in bytes.
columns:
The ordered pandas DataFrame columns to be uploaded.

Endpoint:
- `POST /ddl/{graph_name}`
See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job]
"""
logger.info("entry: runLoadingJobWithDataFrame")
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

if columns is None:
data = df.to_csv(sep = sep, header=False)
else:
data = df.to_csv(columns = columns, sep = sep, header=False)

res = await self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit)

logger.info("exit: runLoadingJobWithDataFrame")

return res

async def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str, sep: str = None,
eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]:
"""Execute a loading job with the referenced file.
Expand Down Expand Up @@ -52,19 +103,70 @@ async def runLoadingJobWithFile(self, filePath: str, fileTag: str, jobName: str,
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

data, params = _prep_run_loading_job_with_file(
filePath, jobName, fileTag, sep, eol)
data = _prep_run_loading_job_with_file(filePath)
res = await self.runLoadingJobWithData(data, fileTag, jobName, sep, eol, timeout, sizeLimit)

logger.info("exit: runLoadingJobWithFile")

return res

async def runLoadingJobWithData(self, data: str, fileTag: str, jobName: str, sep: str = None,
eol: str = None, timeout: int = 16000, sizeLimit: int = 128000000) -> Union[dict, None]:
"""Execute a loading job with the given data string.

The data string will be posted to the TigerGraph server and the value of the appropriate
FILENAME definition will be updated to point to the data received.

NOTE: The argument `USING HEADER="true"` in the GSQL loading job may not be enough to
load the file correctly. Remove the header from the data file before using this function.

Args:
data:
The data string to be loaded.
fileTag:
The name of file variable in the loading job (DEFINE FILENAME <fileTag>).
jobName:
The name of the loading job.
sep:
Data value separator. If your data is JSON, you do not need to specify this
parameter. The default separator is a comma `,`.
eol:
End-of-line character. Only one or two characters are allowed, except for the
special case `\\r\\n`. The default value is `\\n`
timeout:
Timeout in seconds. If set to `0`, use the system-wide endpoint timeout setting.
sizeLimit:
Maximum size for input file in bytes.

if not data and not params:
# failed to read file
Endpoint:
- `POST /ddl/{graph_name}`
See xref:tigergraph-server:API:built-in-endpoints.adoc#_run_a_loading_job[Run a loading job]
"""
logger.info("entry: runLoadingJobWithData")
if logger.level == logging.DEBUG:
logger.debug("params: " + self._locals(locals()))

if not data or not jobName or not fileTag:
# invalid inputs
logger.error("Invalid data or params")
logger.info("exit: runLoadingJobWithData")
return None

params = {
"tag": jobName,
"filename": fileTag,
}
if sep is not None:
params["sep"] = sep
if eol is not None:
params["eol"] = eol

res = await self._req("POST", self.restppUrl + "/ddl/" + self.graphname, params=params, data=data,
headers={"RESPONSE-LIMIT": str(sizeLimit), "GSQL-TIMEOUT": str(timeout)})

if logger.level == logging.DEBUG:
logger.debug("return: " + str(res))
logger.info("exit: runLoadingJobWithFile")
logger.info("exit: runLoadingJobWithData")

return res

Expand Down