diff --git a/programs/local/ChdbClient.cpp b/programs/local/ChdbClient.cpp index 831620a2b72..07e1ed79361 100644 --- a/programs/local/ChdbClient.cpp +++ b/programs/local/ChdbClient.cpp @@ -14,6 +14,9 @@ #if USE_PYTHON #include +#include +#include +namespace py = pybind11; #endif namespace DB @@ -318,14 +321,21 @@ CHDB::QueryResultPtr ChdbClient::executeStreamingIterate(void * streaming_result #if USE_PYTHON if (Poco::toLower(default_output_format) == "dataframe") { - res = std::make_unique( + auto rows_read = processed_rows - old_processed_rows; + auto chunk_result = std::make_unique( std::move(collected_chunks), std::move(collected_chunks_header), elapsed_time - old_elapsed_time, - processed_rows - old_processed_rows, + rows_read, processed_bytes - old_processed_bytes, storage_rows_read - old_storage_rows_read, storage_bytes_read - old_storage_bytes_read); + + py::gil_scoped_acquire acquire; + CHDB::PandasDataFrameBuilder builder(*chunk_result); + py::handle df = builder.getDataFrame().release(); + + res = std::make_unique(df, rows_read); } else #endif @@ -350,7 +360,7 @@ CHDB::QueryResultPtr ChdbClient::executeStreamingIterate(void * streaming_result } } - // Check if query should end based on result type + /// Check if query should end based on result type bool is_end = !res->getError().empty() || is_canceled || res->isEmpty(); if (is_end) { diff --git a/programs/local/LocalChdb.cpp b/programs/local/LocalChdb.cpp index 9ef5e358030..ac7d6b62bda 100644 --- a/programs/local/LocalChdb.cpp +++ b/programs/local/LocalChdb.cpp @@ -3,7 +3,6 @@ #include "PandasDataFrameBuilder.h" #include "ChunkCollectorOutputFormat.h" #include "PythonImporter.h" -#include "PythonTableCache.h" #include "StoragePython.h" #include @@ -274,13 +273,14 @@ query_result * connection_wrapper::query(const std::string & query_str, const st auto * result = chdb_query_n(*conn, query_str.data(), query_str.size(), format.data(), format.size()); - auto error_msg = CHDB::chdb_result_error_string(result); + const auto & error_msg = CHDB::chdb_result_error_string(result); if (!error_msg.empty()) { std::string msg_copy(error_msg); chdb_destroy_query_result(result); throw std::runtime_error(msg_copy); } + return new query_result(result, false); } @@ -298,7 +298,7 @@ py::object connection_wrapper::query_df(const std::string & query_str) result = chdb_query_n(*conn, query_str.data(), query_str.size(), format.data(), format.size()); - auto error_msg = CHDB::chdb_result_error_string(result); + const auto & error_msg = CHDB::chdb_result_error_string(result); if (!error_msg.empty()) { std::string msg_copy(error_msg); @@ -322,7 +322,7 @@ streaming_query_result * connection_wrapper::send_query(const std::string & quer CHDB::cachePythonTablesFromQuery(reinterpret_cast(*conn), query_str); py::gil_scoped_release release; auto * result = chdb_stream_query_n(*conn, query_str.data(), query_str.size(), format.data(), format.size()); - auto error_msg = CHDB::chdb_result_error_string(result); + const auto & error_msg = CHDB::chdb_result_error_string(result); if (!error_msg.empty()) { std::string msg_copy(error_msg); @@ -342,7 +342,7 @@ query_result * connection_wrapper::streaming_fetch_result(streaming_query_result auto * result = chdb_stream_fetch_result(*conn, streaming_result->get_result()); - const auto error_msg = CHDB::chdb_result_error_string(result); + const auto & error_msg = CHDB::chdb_result_error_string(result); if (!error_msg.empty()) { std::string msg_copy(error_msg); @@ -359,14 +359,14 @@ py::object connection_wrapper::streaming_fetch_df(streaming_query_result * strea return py::none(); chdb_result * result = nullptr; - CHDB::ChunkQueryResult * chunk_result = nullptr; + CHDB::DataFrameQueryResult * chunk_result = nullptr; { py::gil_scoped_release release; - result = chdb_stream_fetch_result(*conn, streaming_result->get_result()); + result = chdb_stream_fetch_result(*conn, streaming_result->get_result()); - auto error_msg = CHDB::chdb_result_error_string(result); + const auto & error_msg = CHDB::chdb_result_error_string(result); if (!error_msg.empty()) { std::string msg_copy(error_msg); @@ -374,15 +374,14 @@ py::object connection_wrapper::streaming_fetch_df(streaming_query_result * strea throw std::runtime_error(msg_copy); } - if (!(chunk_result = dynamic_cast(reinterpret_cast(result)))) - throw std::runtime_error("Expected ChunkQueryResult for dataframe format"); + if (!(chunk_result = dynamic_cast(reinterpret_cast(result)))) + throw std::runtime_error("Expected DataFrameQueryResult for dataframe format"); } - CHDB::PandasDataFrameBuilder builder(*chunk_result); - auto df = builder.getDataFrame(); + py::handle df_handle = chunk_result->dataframe; chdb_destroy_query_result(result); - return df; + return py::reinterpret_steal(df_handle); } void connection_wrapper::streaming_cancel_query(streaming_query_result * streaming_result) diff --git a/programs/local/QueryResult.h b/programs/local/QueryResult.h index c3825df110d..9a4373826c4 100644 --- a/programs/local/QueryResult.h +++ b/programs/local/QueryResult.h @@ -9,6 +9,8 @@ #if USE_PYTHON #include +#include +namespace py = pybind11; namespace DB { class Block; @@ -23,7 +25,8 @@ enum class QueryResultType : uint8_t RESULT_TYPE_MATERIALIZED = 0, RESULT_TYPE_STREAMING = 1, RESULT_TYPE_CHUNK = 2, - RESULT_TYPE_NONE = 3 + RESULT_TYPE_DATAFRAME = 3, + RESULT_TYPE_NONE = 4 }; class QueryResult @@ -144,6 +147,26 @@ class ChunkQueryResult : public QueryResult uint64_t storage_rows_read; uint64_t storage_bytes_read; }; + +class DataFrameQueryResult : public QueryResult +{ +public: + explicit DataFrameQueryResult( + py::handle dataframe_, + uint64_t rows_read) + : QueryResult(QueryResultType::RESULT_TYPE_DATAFRAME), + dataframe(dataframe_), + is_empty(rows_read == 0) + {} + + bool isEmpty() const override + { + return is_empty; + } + + py::handle dataframe; + bool is_empty; +}; #endif using QueryResultPtr = std::unique_ptr; @@ -151,6 +174,7 @@ using MaterializedQueryResultPtr = std::unique_ptr; using StreamQueryResultPtr = std::unique_ptr; #if USE_PYTHON using ChunkQueryResultPtr = std::unique_ptr; +using DataFrameQueryResultPtr = std::unique_ptr; #endif } // namespace CHDB diff --git a/src/Common/memory.h b/src/Common/memory.h index cbe9f7b7c64..5c699d98814 100644 --- a/src/Common/memory.h +++ b/src/Common/memory.h @@ -221,6 +221,9 @@ inline ALWAYS_INLINE size_t untrackMemory(void * ptr [[maybe_unused]], Allocatio /// It's innaccurate resource free for sanitizers. malloc_usable_size() result is greater or equal to allocated size. else actual_size = malloc_usable_size(ptr); +# elif defined(OS_DARWIN) + else + actual_size = malloc_size(ptr); # endif #endif trace = CurrentMemoryTracker::free(actual_size);