Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions programs/local/ChdbClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

#if USE_PYTHON
#include <PythonTableCache.h>
#include <PandasDataFrameBuilder.h>
#include <pybind11/pybind11.h>
namespace py = pybind11;
#endif

namespace DB
Expand Down Expand Up @@ -318,14 +321,21 @@ CHDB::QueryResultPtr ChdbClient::executeStreamingIterate(void * streaming_result
#if USE_PYTHON
if (Poco::toLower(default_output_format) == "dataframe")
{
res = std::make_unique<CHDB::ChunkQueryResult>(
auto rows_read = processed_rows - old_processed_rows;
auto chunk_result = std::make_unique<CHDB::ChunkQueryResult>(
std::move(collected_chunks),
std::move(collected_chunks_header),
elapsed_time - old_elapsed_time,
processed_rows - old_processed_rows,
rows_read,
processed_bytes - old_processed_bytes,
storage_rows_read - old_storage_rows_read,
storage_bytes_read - old_storage_bytes_read);

py::gil_scoped_acquire acquire;
CHDB::PandasDataFrameBuilder builder(*chunk_result);
py::handle df = builder.getDataFrame().release();

res = std::make_unique<CHDB::DataFrameQueryResult>(df, rows_read);
}
else
#endif
Expand All @@ -350,7 +360,7 @@ CHDB::QueryResultPtr ChdbClient::executeStreamingIterate(void * streaming_result
}
}

// Check if query should end based on result type
/// Check if query should end based on result type
bool is_end = !res->getError().empty() || is_canceled || res->isEmpty();
if (is_end)
{
Expand Down
25 changes: 12 additions & 13 deletions programs/local/LocalChdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "PandasDataFrameBuilder.h"
#include "ChunkCollectorOutputFormat.h"
#include "PythonImporter.h"
#include "PythonTableCache.h"
#include "StoragePython.h"

#include <pybind11/detail/non_limited_api.h>
Expand Down Expand Up @@ -274,13 +273,14 @@ query_result * connection_wrapper::query(const std::string & query_str, const st

auto * result = chdb_query_n(*conn, query_str.data(), query_str.size(), format.data(), format.size());

auto error_msg = CHDB::chdb_result_error_string(result);
const auto & error_msg = CHDB::chdb_result_error_string(result);
if (!error_msg.empty())
{
std::string msg_copy(error_msg);
chdb_destroy_query_result(result);
throw std::runtime_error(msg_copy);
}

return new query_result(result, false);
}

Expand All @@ -298,7 +298,7 @@ py::object connection_wrapper::query_df(const std::string & query_str)

result = chdb_query_n(*conn, query_str.data(), query_str.size(), format.data(), format.size());

auto error_msg = CHDB::chdb_result_error_string(result);
const auto & error_msg = CHDB::chdb_result_error_string(result);
if (!error_msg.empty())
{
std::string msg_copy(error_msg);
Expand All @@ -322,7 +322,7 @@ streaming_query_result * connection_wrapper::send_query(const std::string & quer
CHDB::cachePythonTablesFromQuery(reinterpret_cast<chdb_conn *>(*conn), query_str);
py::gil_scoped_release release;
auto * result = chdb_stream_query_n(*conn, query_str.data(), query_str.size(), format.data(), format.size());
auto error_msg = CHDB::chdb_result_error_string(result);
const auto & error_msg = CHDB::chdb_result_error_string(result);
if (!error_msg.empty())
{
std::string msg_copy(error_msg);
Expand All @@ -342,7 +342,7 @@ query_result * connection_wrapper::streaming_fetch_result(streaming_query_result

auto * result = chdb_stream_fetch_result(*conn, streaming_result->get_result());

const auto error_msg = CHDB::chdb_result_error_string(result);
const auto & error_msg = CHDB::chdb_result_error_string(result);
if (!error_msg.empty())
{
std::string msg_copy(error_msg);
Expand All @@ -359,30 +359,29 @@ py::object connection_wrapper::streaming_fetch_df(streaming_query_result * strea
return py::none();

chdb_result * result = nullptr;
CHDB::ChunkQueryResult * chunk_result = nullptr;
CHDB::DataFrameQueryResult * chunk_result = nullptr;

{
py::gil_scoped_release release;

result = chdb_stream_fetch_result(*conn, streaming_result->get_result());
result = chdb_stream_fetch_result(*conn, streaming_result->get_result());

auto error_msg = CHDB::chdb_result_error_string(result);
const auto & error_msg = CHDB::chdb_result_error_string(result);
if (!error_msg.empty())
{
std::string msg_copy(error_msg);
chdb_destroy_query_result(result);
throw std::runtime_error(msg_copy);
}

if (!(chunk_result = dynamic_cast<CHDB::ChunkQueryResult *>(reinterpret_cast<CHDB::QueryResult*>(result))))
throw std::runtime_error("Expected ChunkQueryResult for dataframe format");
if (!(chunk_result = dynamic_cast<CHDB::DataFrameQueryResult *>(reinterpret_cast<CHDB::QueryResult*>(result))))
throw std::runtime_error("Expected DataFrameQueryResult for dataframe format");
}

CHDB::PandasDataFrameBuilder builder(*chunk_result);
auto df = builder.getDataFrame();
py::handle df_handle = chunk_result->dataframe;
chdb_destroy_query_result(result);

return df;
return py::reinterpret_steal<py::object>(df_handle);
}

void connection_wrapper::streaming_cancel_query(streaming_query_result * streaming_result)
Expand Down
26 changes: 25 additions & 1 deletion programs/local/QueryResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#if USE_PYTHON
#include <Processors/Chunk.h>
#include <pybind11/pybind11.h>
namespace py = pybind11;
namespace DB
{
class Block;
Expand All @@ -23,7 +25,8 @@ enum class QueryResultType : uint8_t
RESULT_TYPE_MATERIALIZED = 0,
RESULT_TYPE_STREAMING = 1,
RESULT_TYPE_CHUNK = 2,
RESULT_TYPE_NONE = 3
RESULT_TYPE_DATAFRAME = 3,
RESULT_TYPE_NONE = 4
};

class QueryResult
Expand Down Expand Up @@ -144,13 +147,34 @@ class ChunkQueryResult : public QueryResult
uint64_t storage_rows_read;
uint64_t storage_bytes_read;
};

class DataFrameQueryResult : public QueryResult
{
public:
explicit DataFrameQueryResult(
py::handle dataframe_,
uint64_t rows_read)
: QueryResult(QueryResultType::RESULT_TYPE_DATAFRAME),
dataframe(dataframe_),
is_empty(rows_read == 0)
{}

bool isEmpty() const override
{
return is_empty;
}

py::handle dataframe;
bool is_empty;
};
#endif

using QueryResultPtr = std::unique_ptr<QueryResult>;
using MaterializedQueryResultPtr = std::unique_ptr<MaterializedQueryResult>;
using StreamQueryResultPtr = std::unique_ptr<StreamQueryResult>;
#if USE_PYTHON
using ChunkQueryResultPtr = std::unique_ptr<ChunkQueryResult>;
using DataFrameQueryResultPtr = std::unique_ptr<DataFrameQueryResult>;
#endif

} // namespace CHDB
3 changes: 3 additions & 0 deletions src/Common/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ inline ALWAYS_INLINE size_t untrackMemory(void * ptr [[maybe_unused]], Allocatio
/// It's innaccurate resource free for sanitizers. malloc_usable_size() result is greater or equal to allocated size.
else
actual_size = malloc_usable_size(ptr);
# elif defined(OS_DARWIN)
else
actual_size = malloc_size(ptr);
# endif
#endif
trace = CurrentMemoryTracker::free(actual_size);
Expand Down