diff --git a/CMakeLists.txt b/CMakeLists.txt index c41075394e49..6a9eac81cf9c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -116,6 +116,7 @@ option(VELOX_ENABLE_PARQUET "Enable Parquet support" OFF) option(VELOX_ENABLE_ARROW "Enable Arrow support" OFF) option(VELOX_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF) option(VELOX_ENABLE_CCACHE "Use ccache if installed." ON) +option(VELOX_ENABLE_SDK "Enable SDK support." ON) option(VELOX_BUILD_TEST_UTILS "Builds Velox test utilities" OFF) option(VELOX_BUILD_VECTOR_TEST_UTILS "Builds Velox vector test utilities" OFF) diff --git a/velox/CMakeLists.txt b/velox/CMakeLists.txt index 06ae8bf1c053..bb6de5644b73 100644 --- a/velox/CMakeLists.txt +++ b/velox/CMakeLists.txt @@ -77,3 +77,7 @@ endif() if(${VELOX_BUILD_TESTING}) add_subdirectory(tool) endif() + +if(${VELOX_ENABLE_SDK}) + add_subdirectory(sdk) +endif() \ No newline at end of file diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index a04ef03e8724..d0ef7eaef2ca 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -282,6 +282,9 @@ class MemoryManager { const std::vector>& testingSharedLeafPools() { return sharedLeafPools_; } + + // Returns the shared references to all the alive memory pools in 'pools_'. + std::vector> getAlivePools() const; private: std::shared_ptr createRootPool( @@ -291,8 +294,6 @@ class MemoryManager { void dropPool(MemoryPool* pool); - // Returns the shared references to all the alive memory pools in 'pools_'. - std::vector> getAlivePools() const; const std::shared_ptr allocator_; diff --git a/velox/sdk/CMakeLists.txt b/velox/sdk/CMakeLists.txt new file mode 100644 index 000000000000..e0b670150c7d --- /dev/null +++ b/velox/sdk/CMakeLists.txt @@ -0,0 +1,51 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +find_package(JNI REQUIRED) + +if (VELOX_ENABLE_SDK_DEBUG) + add_definitions(-DVELOX_ENABLE_SDK_DEBUG) +endif () +include_directories(${JNI_INCLUDE_DIRS}) + +velox_add_library(sdk SHARED + cpp/jni/VeloxJniWrapper.cc + cpp/jni/JniErrors.cc + cpp/jni/JniUtil.cc + cpp/native/NativePlanBuilder.cc + cpp/native/NativeClass.cc + cpp/memory/MemoryManager.cpp + cpp/memory/NativeMemoryManger.cc + cpp/common/Global.cc + cpp/funcitons/RegistrationAllFunctions.cc +) + +velox_link_libraries(sdk + velox_type + velox_vector + velox_exec + velox_exec_test_lib + velox_parse_expression + velox_hive_connector + velox_memory + velox_dwio_parquet_reader + velox_functions_prestosql_impl + velox_functions_spark + velox_aggregates + velox_functions_spark_aggregates + velox_window + velox_vector + velox_functions_spark_window + velox_exec_test_lib) \ No newline at end of file diff --git a/velox/sdk/cpp/common/Global.cc b/velox/sdk/cpp/common/Global.cc new file mode 100644 index 000000000000..9383f40c4fea --- /dev/null +++ b/velox/sdk/cpp/common/Global.cc @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "gflags/gflags.h" +#ifndef GLOBAL_H +#define GLOBAL_H +DEFINE_int64( + max_root_memory_bytes, + 10L * 1024L * 1024L * 1024L , + "root memory size"); + +DEFINE_int64( + max_query_memory_bytes, + 1L * 1024L * 1024L * 1024L, + "query memory size"); +#endif //GLOBAL_H diff --git a/velox/sdk/cpp/common/Global.h b/velox/sdk/cpp/common/Global.h new file mode 100644 index 000000000000..e8cd135e0d95 --- /dev/null +++ b/velox/sdk/cpp/common/Global.h @@ -0,0 +1,26 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "gflags/gflags.h" +#ifndef GLOBAL_H +#define GLOBAL_H +DECLARE_int64( + max_root_memory_bytes); + +DECLARE_int64( + max_query_memory_bytes); +#endif //GLOBAL_H diff --git a/velox/sdk/cpp/common/macros.h b/velox/sdk/cpp/common/macros.h new file mode 100644 index 000000000000..1d41085d1290 --- /dev/null +++ b/velox/sdk/cpp/common/macros.h @@ -0,0 +1,283 @@ +// Copyright 2008 Google Inc. All Rights Reserved. +// +// Various Google-specific macros. +// +// This code is compiled directly on many platforms, including client +// platforms like Windows, Mac, and embedded systems. Before making +// any changes here, make sure that you're not breaking any platforms. +// + +#ifndef BASE_MACROS_H_ +#define BASE_MACROS_H_ + +#include // For size_t + +// The swigged version of an abstract class must be concrete if any methods +// return objects of the abstract type. We keep it abstract in C++ and +// concrete for swig. +#ifndef SWIG +#define ABSTRACT = 0 +#endif + +// The COMPILE_ASSERT macro can be used to verify that a compile time +// expression is true. For example, you could use it to verify the +// size of a static array: +// +// COMPILE_ASSERT(ARRAYSIZE(content_type_names) == CONTENT_NUM_TYPES, +// content_type_names_incorrect_size); +// +// or to make sure a struct is smaller than a certain size: +// +// COMPILE_ASSERT(sizeof(foo) < 128, foo_too_large); +// +// The second argument to the macro is the name of the variable. If +// the expression is false, most compilers will issue a warning/error +// containing the name of the variable. + +template +struct CompileAssert { +}; + +#define COMPILE_ASSERT(expr, msg) \ + typedef CompileAssert<(bool(expr))> msg[bool(expr) ? 1 : -1] ATTRIBUTE_UNUSED + +// Implementation details of COMPILE_ASSERT: +// +// - COMPILE_ASSERT works by defining an array type that has -1 +// elements (and thus is invalid) when the expression is false. +// +// - The simpler definition +// +// #define COMPILE_ASSERT(expr, msg) typedef char msg[(expr) ? 1 : -1] +// +// does not work, as gcc supports variable-length arrays whose sizes +// are determined at run-time (this is gcc's extension and not part +// of the C++ standard). As a result, gcc fails to reject the +// following code with the simple definition: +// +// int foo; +// COMPILE_ASSERT(foo, msg); // not supposed to compile as foo is +// // not a compile-time constant. +// +// - By using the type CompileAssert<(bool(expr))>, we ensures that +// expr is a compile-time constant. (Template arguments must be +// determined at compile-time.) +// +// - The outer parentheses in CompileAssert<(bool(expr))> are necessary +// to work around a bug in gcc 3.4.4 and 4.0.1. If we had written +// +// CompileAssert +// +// instead, these compilers will refuse to compile +// +// COMPILE_ASSERT(5 > 0, some_message); +// +// (They seem to think the ">" in "5 > 0" marks the end of the +// template argument list.) +// +// - The array size is (bool(expr) ? 1 : -1), instead of simply +// +// ((expr) ? 1 : -1). +// +// This is to avoid running into a bug in MS VC 7.1, which +// causes ((0.0) ? 1 : -1) to incorrectly evaluate to 1. + + +// A macro to disallow the copy constructor and operator= functions +// This should be used in the private: declarations for a class +// +// For disallowing only assign or copy, write the code directly, but declare +// the intend in a comment, for example: +// void operator=(const TypeName&); // DISALLOW_ASSIGN +// Note, that most uses of DISALLOW_ASSIGN and DISALLOW_COPY are broken +// semantically, one should either use disallow both or neither. Try to +// avoid these in new code. +#ifndef DISALLOW_COPY_AND_ASSIGN +#define DISALLOW_COPY_AND_ASSIGN(TypeName) \ + TypeName(const TypeName&) = delete; \ + TypeName& operator=(const TypeName&) = delete +#endif + +// An older, politically incorrect name for the above. +// Prefer DISALLOW_COPY_AND_ASSIGN for new code. +#define DISALLOW_EVIL_CONSTRUCTORS(TypeName) DISALLOW_COPY_AND_ASSIGN(TypeName) + +// A macro to disallow all the implicit constructors, namely the +// default constructor, copy constructor and operator= functions. +// +// This should be used in the private: declarations for a class +// that wants to prevent anyone from instantiating it. This is +// especially useful for classes containing only static methods. +#define DISALLOW_IMPLICIT_CONSTRUCTORS(TypeName) \ + TypeName() = delete; \ + DISALLOW_COPY_AND_ASSIGN(TypeName) + +// The arraysize(arr) macro returns the # of elements in an array arr. +// The expression is a compile-time constant, and therefore can be +// used in defining new arrays, for example. If you use arraysize on +// a pointer by mistake, you will get a compile-time error. +// +// One caveat is that, for C++03, arraysize() doesn't accept any array of +// an anonymous type or a type defined inside a function. In these rare +// cases, you have to use the unsafe ARRAYSIZE() macro below. This is +// due to a limitation in C++03's template system. The limitation has +// been removed in C++11. + +// This template function declaration is used in defining arraysize. +// Note that the function doesn't need an implementation, as we only +// use its type. +template +char (&ArraySizeHelper(T (&array)[N]))[N]; + +// That gcc wants both of these prototypes seems mysterious. VC, for +// its part, can't decide which to use (another mystery). Matching of +// template overloads: the final frontier. +#ifndef _MSC_VER +template +char (&ArraySizeHelper(const T (&array)[N]))[N]; +#endif + +#define arraysize(array) (sizeof(ArraySizeHelper(array))) + +// ARRAYSIZE performs essentially the same calculation as arraysize, +// but can be used on anonymous types or types defined inside +// functions. It's less safe than arraysize as it accepts some +// (although not all) pointers. Therefore, you should use arraysize +// whenever possible. +// +// The expression ARRAYSIZE(a) is a compile-time constant of type +// size_t. +// +// ARRAYSIZE catches a few type errors. If you see a compiler error +// +// "warning: division by zero in ..." +// +// when using ARRAYSIZE, you are (wrongfully) giving it a pointer. +// You should only use ARRAYSIZE on statically allocated arrays. +// +// The following comments are on the implementation details, and can +// be ignored by the users. +// +// ARRAYSIZE(arr) works by inspecting sizeof(arr) (the # of bytes in +// the array) and sizeof(*(arr)) (the # of bytes in one array +// element). If the former is divisible by the latter, perhaps arr is +// indeed an array, in which case the division result is the # of +// elements in the array. Otherwise, arr cannot possibly be an array, +// and we generate a compiler error to prevent the code from +// compiling. +// +// Since the size of bool is implementation-defined, we need to cast +// !(sizeof(a) & sizeof(*(a))) to size_t in order to ensure the final +// result has type size_t. +// +// This macro is not perfect as it wrongfully accepts certain +// pointers, namely where the pointer size is divisible by the pointee +// size. For code that goes through a 32-bit compiler, where a pointer +// is 4 bytes, this means all pointers to a type whose size is 3 or +// greater than 4 will be (righteously) rejected. +// +// Kudos to Jorg Brown for this simple and elegant implementation. +// +// - wan 2005-11-16 +// +// Starting with Visual C++ 2005, WinNT.h includes ARRAYSIZE. +#if !defined(_MSC_VER) || (defined(_MSC_VER) && _MSC_VER < 1400) +#define ARRAYSIZE(a) \ + ((sizeof(a) / sizeof(*(a))) / \ + static_cast(!(sizeof(a) % sizeof(*(a))))) +#endif + +// A macro to turn a symbol into a string +#define AS_STRING(x) AS_STRING_INTERNAL(x) +#define AS_STRING_INTERNAL(x) #x + +// Macro that allows definition of a variable appended with the current line +// number in the source file. Typically for use by other macros to allow the +// user to declare multiple variables with the same "base" name inside the same +// lexical block. +#define VARNAME_LINENUM(varname) VARNAME_LINENUM_INTERNAL(varname ## _L, __LINE__) +#define VARNAME_LINENUM_INTERNAL(v, line) VARNAME_LINENUM_INTERNAL2(v, line) +#define VARNAME_LINENUM_INTERNAL2(v, line) v ## line + +// The following enum should be used only as a constructor argument to indicate +// that the variable has static storage class, and that the constructor should +// do nothing to its state. It indicates to the reader that it is legal to +// declare a static instance of the class, provided the constructor is given +// the base::LINKER_INITIALIZED argument. Normally, it is unsafe to declare a +// static variable that has a constructor or a destructor because invocation +// order is undefined. However, IF the type can be initialized by filling with +// zeroes (which the loader does for static variables), AND the type's +// destructor does nothing to the storage, then a constructor for static +// initialization can be declared as +// explicit MyClass(base::LinkerInitialized x) {} +// and invoked as +// static MyClass my_variable_name(base::LINKER_INITIALIZED); +namespace base { +enum LinkerInitialized { LINKER_INITIALIZED }; +} + +// The FALLTHROUGH_INTENDED macro can be used to annotate implicit fall-through +// between switch labels: +// switch (x) { +// case 40: +// case 41: +// if (truth_is_out_there) { +// ++x; +// FALLTHROUGH_INTENDED; // Use instead of/along with annotations in +// // comments. +// } else { +// return x; +// } +// case 42: +// ... +// +// As shown in the example above, the FALLTHROUGH_INTENDED macro should be +// followed by a semicolon. It is designed to mimic control-flow statements +// like 'break;', so it can be placed in most places where 'break;' can, but +// only if there are no statements on the execution path between it and the +// next switch label. +// +// When compiled with clang in C++11 mode, the FALLTHROUGH_INTENDED macro is +// expanded to [[clang::fallthrough]] attribute, which is analysed when +// performing switch labels fall-through diagnostic ('-Wimplicit-fallthrough'). +// See clang documentation on language extensions for details: +// http://clang.llvm.org/docs/LanguageExtensions.html#clang__fallthrough +// +// When used with unsupported compilers, the FALLTHROUGH_INTENDED macro has no +// effect on diagnostics. +// +// In either case this macro has no effect on runtime behavior and performance +// of code. +#if defined(__clang__) && defined(LANG_CXX11) && defined(__has_warning) +#if __has_feature(cxx_attributes) && __has_warning("-Wimplicit-fallthrough") +#define FALLTHROUGH_INTENDED [[clang::fallthrough]] // NOLINT +#endif +#endif + +#ifndef FALLTHROUGH_INTENDED +#define FALLTHROUGH_INTENDED do { } while (0) +#endif + +// Retry on EINTR for functions like read() that return -1 on error. +#define RETRY_ON_EINTR(err, expr) do { \ + static_assert(std::is_signed::value, \ + #err " must be a signed integer"); \ + (err) = (expr); \ +} while ((err) == -1 && errno == EINTR) + +// Same as above but for stream API calls like fread() and fwrite(). +#define STREAM_RETRY_ON_EINTR(nread, stream, expr) do { \ + static_assert(std::is_unsigned::value == true, \ + #nread " must be an unsigned integer"); \ + (nread) = (expr); \ +} while ((nread) == 0 && ferror(stream) == EINTR) + +// Same as above but for functions that return pointer types (like +// fopen() and freopen()). +#define POINTER_RETRY_ON_EINTR(ptr, expr) do { \ + static_assert(std::is_pointer::value == true, \ + #ptr " must be a pointer"); \ + (ptr) = (expr); \ +} while ((ptr) == nullptr && errno == EINTR) + +#endif // BASE_MACROS_H_ diff --git a/velox/sdk/cpp/expression/ExpressionUtils.hpp b/velox/sdk/cpp/expression/ExpressionUtils.hpp new file mode 100644 index 000000000000..454c8abd0917 --- /dev/null +++ b/velox/sdk/cpp/expression/ExpressionUtils.hpp @@ -0,0 +1,236 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef EXPRESSIONUTILS_HPP +#define EXPRESSIONUTILS_HPP + +#include +#include +#include +#include "sdk/cpp/memory/MemoryManager.h" + +#include "velox/expression/Expr.h" +#include "velox/parse/Expressions.h" +#include "velox/parse/ExpressionsParser.h" +#include "velox/parse/TypeResolver.h" + +#include + +namespace facebook::velox::sdk::expression { + +class ExprUtils { + public: + template + static std::vector> asISerializableVector( + JNIEnv* env, + jobjectArray exprJsons) { + std::vector> typeExprs; + std::vector jsons = + ConvertJStringArrayToVector(env, exprJsons); + for (auto json : jsons) { + std::shared_ptr result = ISerializable::deserialize( + folly::parseJson(json, getSerializationOptions()), + sdk::memory::MemoryManager::get()->planMemoryPool().get()); + VELOX_CHECK_NOT_NULL( + result, + "failed to deserialize to class {} with json {} ", + boost::typeindex::type_id().pretty_name().c_str(), + json); + typeExprs.emplace_back(result); + } + return typeExprs; + } + + template + static std::shared_ptr asISerializable( + JNIEnv* env, + jstring exprJson) { + auto json = jStringToCString(env, exprJson); + std::shared_ptr result = ISerializable::deserialize( + folly::parseJson(json, getSerializationOptions()), + sdk::memory::MemoryManager::get()->planMemoryPool().get()); + VELOX_CHECK_NOT_NULL( + result, + "failed to deserialize to class {} with json {} ", + boost::typeindex::type_id().pretty_name().c_str(), + json); + return result; + } + + template + static std::vector deserializeArray(JNIEnv* env, jobjectArray exprJsons) { + std::vector typeExprs; + std::vector jsons = + ConvertJStringArrayToVector(env, exprJsons); + for (const auto& json : jsons) { + typeExprs.push_back( + T::deserialize(folly::parseJson(json, getSerializationOptions()))); + } + return typeExprs; + } + + static exec::ExprSet compileExpression( + const std::string& text, + const TypePtr& rowType) { + parse::ParseOptions options_; + auto previousHook = core::Expressions::getResolverHook(); + parse::registerTypeResolver(); + auto untyped = parse::parseExpr(text, options_); + auto typed = core::Expressions::inferTypes( + untyped, + rowType, + sdk::memory::MemoryManager::get()->dictExecutionMemoryPool().get()); + core::Expressions::setTypeResolverHook(previousHook); + std::shared_ptr queryCtx_ = core::QueryCtx::create(); + core::ExecCtx execCtx_{ + sdk::memory::MemoryManager::get()->vectorBatchMemoryPool().get(), + queryCtx_.get()}; + return exec::ExprSet({typed}, &execCtx_); + } + + static VectorPtr evaluate( + exec::ExprSet& exprSet, + const RowVectorPtr& data, + const SelectivityVector& rows) { + std::shared_ptr queryCtx_ = core::QueryCtx::create(); + core::ExecCtx execCtx_{ + sdk::memory::MemoryManager::get()->dictExecutionMemoryPool().get(), + queryCtx_.get()}; + exec::EvalCtx evalCtx(&execCtx_, &exprSet, data.get()); + std::vector results(1); + exprSet.eval(rows, evalCtx, results); + return results[0]; + } + + static VectorPtr evaluate(exec::ExprSet& exprSet, const RowVectorPtr& data) { + SelectivityVector rows(data->size()); + return evaluate(exprSet, data, rows); + } + + static VectorPtr evaluate( + const std::string& expression, + const RowVectorPtr& data) { + auto exprSet = compileExpression(expression, asRowType(data->type())); + return evaluate(exprSet, data); + } + + static TypePtr jsonToVeloxType(std::string json) { + return convertSparkStructToVelox( + folly::parseJson(json, getSerializationOptions())); + } + + // 将Spark的字段类型转换为Velox的Type + static TypePtr convertSparkFieldToVelox(const folly::dynamic& field) { + if (field["type"].isObject()) { + return convertSparkStructToVelox(field); + } + auto type = field["type"].asString(); + + // 这个例子没有处理nullable字段,实际使用时你可能需要考虑这个字段 + + if (type == "integer") { + return INTEGER(); + } else if (type == "long") { + return BIGINT(); + } else if (type == "string") { + return VARCHAR(); + } else if (type == "double") { + return DOUBLE(); + } else if (type == "date") { + return DATE(); + } else if (type == "timestamp") { + return TIMESTAMP(); + } else { + throw std::runtime_error("Unsupported type: " + type); + } + } + + // 将Spark的StructType转换为Velox的Type + static TypePtr convertSparkStructToVelox(const folly::dynamic& sparkStruct) { + std::vector fields; + std::vector names; + for (const auto& field : sparkStruct["fields"]) { + fields.push_back(convertSparkFieldToVelox(field)); + names.push_back(field["name"].asString()); + } + return ROW(std::move(names), std::move(fields)); + } + + static TypePtr toVeloxType(std::string str) { + type::fbhive::HiveTypeParser parser; + return parser.parse(str); + } + + static core::PlanNodePtr ToPlan(std::string planJson) { + std::shared_ptr plan = + ISerializable::deserialize( + folly::parseJson(planJson, getSerializationOptions()), + sdk::memory::MemoryManager::get()->dictExecutionMemoryPool().get()); + return plan; + } + + static std::string throwWindowFunctionSignatureNotSupported( + const std::string& name, + const std::vector& types, + const std::vector& signatures) { + std::stringstream error; + error << "Window function signature is not supported: " + << facebook::velox::exec::toString(name, types) + << ". Supported signatures: " << toString(signatures) << "."; + VELOX_USER_FAIL(error.str()); + } + + static std::string throwWindowFunctionDoesntExist(const std::string& name) { + std::stringstream error; + error << "Window function doesn't exist: " << name << "."; + if (exec::windowFunctions().empty()) { + error << " Registry of window functions is empty. " + "Make sure to register some window functions."; + } + VELOX_USER_FAIL(error.str()); + } + + static TypePtr resolveWindowType( + const std::string& windowFunctionName, + const std::vector& inputTypes, + bool nullOnFailure) { + if (auto signatures = + exec::getWindowFunctionSignatures(windowFunctionName)) { + for (const auto& signature : signatures.value()) { + exec::SignatureBinder binder(*signature, inputTypes); + if (binder.tryBind()) { + return binder.tryResolveType(signature->returnType()); + } + } + + if (nullOnFailure) { + return nullptr; + } + throwWindowFunctionSignatureNotSupported( + windowFunctionName, inputTypes, signatures.value()); + } + + if (nullOnFailure) { + return nullptr; + } + throwWindowFunctionDoesntExist(windowFunctionName); + return nullptr; + } +}; + +} // namespace facebook::velox::expression +#endif // EXPRESSIONUTILS_HPP diff --git a/velox/sdk/cpp/funcitons/RegistrationAllFunctions.cc b/velox/sdk/cpp/funcitons/RegistrationAllFunctions.cc new file mode 100644 index 000000000000..d58f05fd47b8 --- /dev/null +++ b/velox/sdk/cpp/funcitons/RegistrationAllFunctions.cc @@ -0,0 +1,32 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "RegistrationAllFunctions.h" + +#include + +#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" +#include "velox/functions/prestosql/window/WindowFunctionsRegistration.h" + +using namespace facebook; +namespace facebook::velox::sdk { +void registerAllFunctions() { + velox::functions::prestosql::registerAllScalarFunctions(); + velox::aggregate::prestosql::registerAllAggregateFunctions("", true); + velox::window::prestosql::registerAllWindowFunctions(); +} +} // namespace facebook::velox::sdk diff --git a/velox/sdk/cpp/funcitons/RegistrationAllFunctions.h b/velox/sdk/cpp/funcitons/RegistrationAllFunctions.h new file mode 100644 index 000000000000..4168547c34be --- /dev/null +++ b/velox/sdk/cpp/funcitons/RegistrationAllFunctions.h @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +namespace facebook::velox::sdk { + +void registerAllFunctions(); + +} // namespace facebook::velox::sdk diff --git a/velox/sdk/cpp/jni/JniCommon.h b/velox/sdk/cpp/jni/JniCommon.h new file mode 100644 index 000000000000..c42603d9b278 --- /dev/null +++ b/velox/sdk/cpp/jni/JniCommon.h @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "common/base/Exceptions.h" + + +static jint jniVersion = JNI_VERSION_1_8; + +#define JNI_METHOD_SIG(name, signature) \ +(JNINativeMethod{ \ + const_cast(#name), \ + const_cast(#signature), \ + reinterpret_cast(name) \ +}) + +struct SharedPtrHandle { + std::shared_ptr plan_node; + + SharedPtrHandle() = default; + template + SharedPtrHandle(std::shared_ptr node) + : plan_node(std::static_pointer_cast(node)) {} + + template + std::shared_ptr as() const { + VELOX_CHECK_NOT_NULL(plan_node, "SharedPtrHandle does not hold an object."); + auto casted_ptr = std::static_pointer_cast(plan_node); + VELOX_CHECK_NOT_NULL(casted_ptr, "Failed to cast to the requested type."); + return casted_ptr; + } +}; + +static inline jclass createGlobalClassReference( + JNIEnv* env, + const char* className) { + jclass localClass = env->FindClass(className); + jclass globalClass = (jclass)env->NewGlobalRef(localClass); + env->DeleteLocalRef(localClass); + return globalClass; +} + +static inline jmethodID +getMethodId(JNIEnv* env, jclass thisClass, const char* name, const char* sig) { + jmethodID ret = env->GetMethodID(thisClass, name, sig); + return ret; +} + +static inline jmethodID getStaticMethodId( + JNIEnv* env, + jclass thisClass, + const char* name, + const char* sig) { + jmethodID ret = env->GetStaticMethodID(thisClass, name, sig); + return ret; +} + +static std::string jbyteArrayToString(JNIEnv* env, jbyteArray array) { + // 获取数组长度 + jsize length = env->GetArrayLength(array); + + // 如果数组为空,返回空的 std::string + if (length == 0) { + return std::string(); + } + + // 获取数组元素 + jbyte* bytes = env->GetByteArrayElements(array, nullptr); + + // 创建 std::string 对象 + std::string str(reinterpret_cast(bytes), length); + + // 释放数组元素 + env->ReleaseByteArrayElements(array, bytes, JNI_ABORT); + + return str; +} + +static inline std::string jStringToCString(JNIEnv* env, jstring jStr) { + if (!jStr) { + return ""; + } + // 将jstring转换为UTF-8格式的C字符串 + const char *chars = env->GetStringUTFChars(jStr, nullptr); + // 使用C字符串创建std::string实例 + std::string ret(chars); + // 通知JVM不再需要这个C字符串了 + env->ReleaseStringUTFChars(jStr, chars); + return ret; +} + +static inline std::vector ConvertJStringArrayToVector( + JNIEnv* env, + jobjectArray jStringArray) { + if (!jStringArray) { + return {}; + } + // 获取数组长度 + jsize arrayLength = env->GetArrayLength(jStringArray); + + // 创建一个std::vector,预留足够的空间 + std::vector result; + result.reserve(arrayLength); + + // 遍历Java字符串数组 + for (jsize i = 0; i < arrayLength; i++) { + // 获取Java字符串 + jstring jStr = (jstring)env->GetObjectArrayElement(jStringArray, i); + + // 检查是否正常获取到Java字符串 + if (jStr) { + // 将Java字符串转换为C++字符串 + const char* rawStr = env->GetStringUTFChars(jStr, nullptr); + std::string str(rawStr); + + // 释放Java字符串内存 + env->ReleaseStringUTFChars(jStr, rawStr); + + // 将C++字符串添加到vector中 + result.push_back(str); + + // 删除局部引用,防止内存泄漏 + env->DeleteLocalRef(jStr); + } + } + + return result; +} + + + + diff --git a/velox/sdk/cpp/jni/JniErrors.cc b/velox/sdk/cpp/jni/JniErrors.cc new file mode 100644 index 000000000000..16fab4dad66b --- /dev/null +++ b/velox/sdk/cpp/jni/JniErrors.cc @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + + +#include "JniErrors.h" + +namespace facebook::velox::sdk { +JniErrorsGlobalState* getJniErrorsState() { + static JniErrorsGlobalState *jniErrorsState = new JniErrorsGlobalState(); + return jniErrorsState; +} + + +} diff --git a/velox/sdk/cpp/jni/JniErrors.h b/velox/sdk/cpp/jni/JniErrors.h new file mode 100644 index 000000000000..1aadd915eb58 --- /dev/null +++ b/velox/sdk/cpp/jni/JniErrors.h @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "JniCommon.h" + +#define JNI_METHOD_START try { +// macro ended + +#define JNI_METHOD_END(fallback_expr) \ + } \ + catch (std::exception & e) { \ + env->ThrowNew(getJniErrorsState()->runtimeExceptionClass(), e.what()); \ + return fallback_expr; \ + } +// macro ended + +namespace facebook::velox::sdk { + +class JniPendingException final : public std::runtime_error { + public: + explicit JniPendingException(const std::string& arg) : runtime_error(arg) {} +}; + +static inline void throwPendingException(const std::string& message) { + throw JniPendingException(message); +} + + + +static inline void jniThrow(const std::string& message) { + throwPendingException(message); +} + +struct JniErrorsGlobalState { + public: + virtual ~JniErrorsGlobalState() = default; + + void initialize(JNIEnv* env) { + std::lock_guard lockGuard(mtx_); + ioExceptionClass_ = createGlobalClassReference(env, "Ljava/io/IOException;"); + runtimeExceptionClass_ = createGlobalClassReference(env, "Ljava/lang/RuntimeException;"); + unsupportedoperationExceptionClass_ = createGlobalClassReference(env, "Ljava/lang/UnsupportedOperationException;"); + illegalAccessExceptionClass_ = createGlobalClassReference(env, "Ljava/lang/IllegalAccessException;"); + illegalArgumentExceptionClass_ = createGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;"); + } + + jclass runtimeExceptionClass() { + std::lock_guard lockGuard(mtx_); + if (runtimeExceptionClass_ == nullptr) { + VELOX_USER_FAIL("Fatal: JniGlobalState::Initialize(...) was not called before using the utility"); + } + return runtimeExceptionClass_; + } + + jclass illegalAccessExceptionClass() { + std::lock_guard lockGuard(mtx_); + if (illegalAccessExceptionClass_ == nullptr) { + VELOX_USER_FAIL("Fatal: JniGlobalState::Initialize(...) was not called before using the utility"); + } + return illegalAccessExceptionClass_; + } + + private: + jclass ioExceptionClass_ = nullptr; + jclass runtimeExceptionClass_ = nullptr; + jclass unsupportedoperationExceptionClass_ = nullptr; + jclass illegalAccessExceptionClass_ = nullptr; + jclass illegalArgumentExceptionClass_ = nullptr; + std::mutex mtx_; + +} ; + +JniErrorsGlobalState* getJniErrorsState(); +} \ No newline at end of file diff --git a/velox/sdk/cpp/jni/JniUtil.cc b/velox/sdk/cpp/jni/JniUtil.cc new file mode 100644 index 000000000000..ad58bf020f84 --- /dev/null +++ b/velox/sdk/cpp/jni/JniUtil.cc @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "JniUtil.h" + +#include "sdk/cpp/jni/JniCommon.h" +#include + +#include +#include "glog/logging.h" + +namespace facebook::velox::sdk { +bool JniUtil::jvm_inited_ = false; +JavaVM* JniUtil::g_vm = nullptr; +__thread JNIEnv* JniUtil::tls_env_ = nullptr; + + +void JniUtil::Init(JavaVM* vm) { + if (jvm_inited_) { + return; + } + g_vm = vm; + jvm_inited_ = true; +} + +jmethodID JniUtil::getMethodID( + JNIEnv* env, + const std::string& className, + const std::string& methodName, + const std::string& sig, + bool isStatic) { + jmethodID methodId; + jclass localCls = + env->FindClass(className.c_str()); + // + if (localCls == nullptr) { + if (env->ExceptionOccurred()) + env->ExceptionDescribe(); + } + VELOX_CHECK_NOT_NULL(localCls, "Failed to find JniUtil class."); + // + auto gloableCls = + reinterpret_cast(env->NewGlobalRef(localCls)); + if (gloableCls == nullptr) { + if (env->ExceptionOccurred()) + env->ExceptionDescribe(); + } + VELOX_CHECK_NOT_NULL(localCls, "Failed to find JniUtil class."); + env->DeleteLocalRef(localCls); + if (env->ExceptionOccurred()) { + env->ExceptionDescribe(); + } + VELOX_CHECK( + !env->ExceptionOccurred(), + "Failed to delete local reference to JniUtil class."); + if (isStatic) { + methodId = env->GetStaticMethodID( + gloableCls, + methodName.c_str(), + + sig.c_str()); + } else { + methodId = env->GetMethodID( + gloableCls, + methodName.c_str(), + + sig.c_str()); + } + if (methodId == NULL) { + if (env->ExceptionOccurred()) + env->ExceptionDescribe(); + } + return methodId; +} + +} // namespace facebook::velox::sdk \ No newline at end of file diff --git a/velox/sdk/cpp/jni/JniUtil.h b/velox/sdk/cpp/jni/JniUtil.h new file mode 100644 index 000000000000..6cc6003e72a5 --- /dev/null +++ b/velox/sdk/cpp/jni/JniUtil.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef VELOX_SDK_JNI_UTIL_H +#define VELOX_SDK_JNI_UTIL_H + +#include +#include +#include +#include +#include "common/base/Exceptions.h" + +#include + +namespace facebook::velox::sdk { + +// copy from implala +class JniUtil { + public: + /// Init JniUtil. This should be called prior to any other calls. + static void Init(JavaVM* g_vm); + + static jmethodID getMethodID( + JNIEnv* env, + const std::string& className, + const std::string& methodName, + const std::string& sig, + bool isStatic = false); + + static JNIEnv* GetJNIEnv() { + int rc = g_vm->GetEnv(reinterpret_cast(&tls_env_), JNI_VERSION_1_8); + VELOX_CHECK_EQ(rc, 0, "Unable to get JVM"); + return tls_env_; + } + + private: + // Set in Init() once the JVM is initialized. + static bool jvm_inited_; + + // Thread-local cache of the JNIEnv for this thread. + static __thread JNIEnv* tls_env_; + + static JavaVM* g_vm; +}; +} // namespace facebook::velox::sdk + +#endif diff --git a/velox/sdk/cpp/jni/VeloxJniWrapper.cc b/velox/sdk/cpp/jni/VeloxJniWrapper.cc new file mode 100644 index 000000000000..2925261034df --- /dev/null +++ b/velox/sdk/cpp/jni/VeloxJniWrapper.cc @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include + +#include "JniCommon.h" +#include "sdk/cpp/funcitons/RegistrationAllFunctions.h" +#include "sdk/cpp/native/NativePlanBuilder.hpp" + +#include "sdk/cpp/jni/JniErrors.h" + +#include "JniUtil.h" + +namespace facebook::velox::sdk::memory {} +namespace facebook::velox::sdk::execution { +class NativeColumnarExecution; +} +using namespace facebook; + +#ifdef __cplusplus +extern "C" { +#endif + +using namespace velox::sdk; +jint JNI_OnLoad(JavaVM* vm, void* reserved) { + JNIEnv* env; + if (vm->GetEnv(reinterpret_cast(&env), jniVersion) != JNI_OK) { + return JNI_ERR; + } + // logging + velox::sdk::registerAllFunctions(); + google::InitGoogleLogging("velox"); + FLAGS_logtostderr = true; + getJniErrorsState()->initialize(env); + JniUtil::Init(vm); + velox::Type::registerSerDe(); + velox::core::ITypedExpr::registerSerDe(); + velox::core::PlanNode::registerSerDe(); + FLAGS_experimental_enable_legacy_cast = false; + jni::NativeClass::RegisterNatives(); + velox::serializer::presto::PrestoVectorSerde::registerVectorSerde(); + velox::exec::registerPartitionFunctionSerDe(); + std::make_shared(); + return jniVersion; +} + +void JNI_OnUnload(JavaVM* vm, void* reserved) { + JNIEnv* env; + vm->GetEnv(reinterpret_cast(&env), jniVersion); + google::ShutdownGoogleLogging(); +} + +#ifdef __cplusplus +} +#endif diff --git a/velox/sdk/cpp/memory/MemoryManager.cpp b/velox/sdk/cpp/memory/MemoryManager.cpp new file mode 100644 index 000000000000..7c2b69a96f21 --- /dev/null +++ b/velox/sdk/cpp/memory/MemoryManager.cpp @@ -0,0 +1,109 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "MemoryManager.h" + +#include + +#include "sdk/cpp/utils/JsonUtils.h" + +namespace facebook::velox::sdk::memory { + +std::shared_ptr MemoryManager::get() { + static std::shared_ptr memoryManager = + std::make_shared(); + return memoryManager; +} + +std::shared_ptr +MemoryManager::rootVeloxMemoryPool() { + static auto veloxAlloc = + facebook::velox::memory::deprecatedDefaultMemoryManager().addRootPool( + "root", FLAGS_max_root_memory_bytes); + return veloxAlloc; +} + +std::shared_ptr +MemoryManager::defaultLeafVeloxMemoryPool() { + static std::shared_ptr defaultPool = + rootVeloxMemoryPool()->addLeafChild("default_leaf"); + return defaultPool; +} + +std::shared_ptr MemoryManager::createQueryPool( + const std::string& queryId, int64_t bytes) { + return facebook::velox::memory::deprecatedDefaultMemoryManager().addRootPool( + velox::core::QueryCtx::generatePoolName(queryId), + bytes); +} + +std::shared_ptr +MemoryManager::vectorBatchMemoryPool() { + static std::shared_ptr defaultPool = + rootVeloxMemoryPool()->addLeafChild("vector_batch"); + return defaultPool; +} + +std::shared_ptr +MemoryManager::dictExecutionMemoryPool() { + static std::shared_ptr defaultPool = + rootVeloxMemoryPool()->addLeafChild("dict_execution"); + return defaultPool; +} + +std::shared_ptr MemoryManager::planMemoryPool() { + static std::shared_ptr defaultPool = + rootVeloxMemoryPool()->addLeafChild("plan"); + return defaultPool; +} + + + + +folly::dynamic MemoryManager::toJsonString( + std::shared_ptr memoryPool) { + auto stats = memoryPool->stats(); + folly::dynamic obj = folly::dynamic::object(); + obj["capacity"] = memoryPool->capacity(); + obj["reservedBytes"] = stats.reservedBytes; + obj["peakBytes"] = stats.peakBytes; + obj["cumulativeBytes"] = stats.cumulativeBytes; + obj["numAllocs"] = stats.numAllocs; + obj["numFrees"] = stats.numFrees; + obj["numReserves"] = stats.numReserves; + obj["numReleases"] = stats.numReleases; + obj["numShrinks"] = stats.numShrinks; + obj["numReclaims"] = stats.numReclaims; + obj["numCollisions"] = stats.numCollisions; + obj["numCapacityGrowths"] = stats.numCapacityGrowths; + return obj; +} + +std::string MemoryManager::memoryStatics() { + folly::dynamic obj = folly::dynamic::object(); + obj["root"] = toJsonString(rootVeloxMemoryPool()); + obj["plan"] = toJsonString(planMemoryPool()); + obj["dict_execution"] = toJsonString(dictExecutionMemoryPool()); + obj["vector_batch"] = toJsonString(vectorBatchMemoryPool()); + for (auto pool : + velox::memory::deprecatedDefaultMemoryManager().getAlivePools()) { + obj[pool->name()] = toJsonString(pool); + } + return utils::JsonUtils::toSortedJson(obj); +} + +} // namespace facebook::velox::sdk::memory \ No newline at end of file diff --git a/velox/sdk/cpp/memory/MemoryManager.h b/velox/sdk/cpp/memory/MemoryManager.h new file mode 100644 index 000000000000..4e9bc61df843 --- /dev/null +++ b/velox/sdk/cpp/memory/MemoryManager.h @@ -0,0 +1,45 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MEMORYMANAGER_H +#define MEMORYMANAGER_H +#include "folly/json/dynamic.h" +#include "sdk/cpp/common/Global.h" + + +namespace facebook::velox::memory { +class MemoryPool; +} +namespace facebook::velox::sdk::memory { +class MemoryManager { + public: + static std::shared_ptr get(); + std::shared_ptr rootVeloxMemoryPool(); + std::shared_ptr defaultLeafVeloxMemoryPool(); + std::shared_ptr createQueryPool( + const std::string& queryId, int64_t bytes); + std::shared_ptr vectorBatchMemoryPool(); + std::shared_ptr dictExecutionMemoryPool(); + std::shared_ptr planMemoryPool(); + folly::dynamic toJsonString( + std::shared_ptr memoryPool); + + std::string memoryStatics(); +}; +} // namespace facebook::velox::sdk::memory + +#endif // MEMORYMANAGER_H diff --git a/velox/sdk/cpp/memory/NativeMemoryManager.hpp b/velox/sdk/cpp/memory/NativeMemoryManager.hpp new file mode 100644 index 000000000000..e02b6cd34f47 --- /dev/null +++ b/velox/sdk/cpp/memory/NativeMemoryManager.hpp @@ -0,0 +1,40 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NativeMemoryManager_HPP +#define NativeMemoryManager_HPP +#include + +#include "sdk/cpp/native/NativeClass.hpp" + +namespace facebook::velox::sdk::memory { +class NativeMemoryManager : public jni::NativeClass { + static std::string CLASS_NAME; + + public: + NativeMemoryManager() : NativeClass(CLASS_NAME) {} + + void initInternal() override; + + static jlong nativeCreate(JNIEnv* env, jobject obj); + + static jstring nativeMemoryStatics(JNIEnv* env, jobject obj); + +}; +} // namespace facebook::velox::sdk + +#endif // NativeMemoryManager_HPP diff --git a/velox/sdk/cpp/memory/NativeMemoryManger.cc b/velox/sdk/cpp/memory/NativeMemoryManger.cc new file mode 100644 index 000000000000..d6e0f16cb326 --- /dev/null +++ b/velox/sdk/cpp/memory/NativeMemoryManger.cc @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "sdk/cpp/jni/JniErrors.h" +#include "sdk/cpp/jni/JniUtil.h" +#include "sdk/cpp/expression/ExpressionUtils.hpp" + +#include + +#include "MemoryManager.h" + + +#include "sdk/cpp/memory/NativeMemoryManager.hpp" + +namespace facebook::velox::sdk::memory { + +using namespace facebook::velox::sdk::jni; + +std::string NativeMemoryManager::CLASS_NAME = + "org/apache/spark/rpc/jni/NativeRPCManager"; + +void NativeMemoryManager::initInternal() { + addNativeMethod( + "nativeCreate", reinterpret_cast(nativeCreate), kTypeLong, NULL); + addNativeMethod( + "nativeMemoryStatics", reinterpret_cast(nativeMemoryStatics), kTypeString, NULL); +} + +jlong NativeMemoryManager::nativeCreate(JNIEnv* env, jobject obj) { + JNI_METHOD_START + auto* handle = new SharedPtrHandle{MemoryManager::get()}; + return reinterpret_cast(handle); + JNI_METHOD_END(-1) +} +jstring NativeMemoryManager::nativeMemoryStatics(JNIEnv* env, jobject obj) { + JNI_METHOD_START + std::shared_ptr memoryManager = as(obj); + return env->NewStringUTF(memoryManager->memoryStatics().c_str()); + JNI_METHOD_END(env->NewStringUTF("")) +} + +} // namespace facebook::velox::sdk \ No newline at end of file diff --git a/velox/sdk/cpp/native/NativeClass.cc b/velox/sdk/cpp/native/NativeClass.cc new file mode 100644 index 000000000000..67cf365fe42c --- /dev/null +++ b/velox/sdk/cpp/native/NativeClass.cc @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "NativeClass.hpp" + +namespace facebook::velox::sdk::jni { +std::string NativeClass::NATIVE_CLASS_NAME = + "velox/jni/NativeClass"; + +NativeClass::NativeClass(const std::string className) + : className_(className), nativeMethods({}) {} + +void NativeClass::init() { + addNativeMethod( + "nativeRelease", reinterpret_cast(nativeRelease), kTypeVoid, NULL); + initInternal(); + registerNativeMethods(); +} + +jmethodID NativeClass::initPointMethod( + std::string className, + std::string functionName, + std::string sig) { + JNIEnv* env = JniUtil::GetJNIEnv(); + // // Find JniUtil class and create a global ref. + jclass local_jni_dict_cl = env->FindClass(className.c_str()); + if (local_jni_dict_cl == nullptr) { + if (env->ExceptionOccurred()) + env->ExceptionDescribe(); + } + VELOX_CHECK_NOT_NULL(local_jni_dict_cl, "Failed to find JniUtil class."); + + jmethodID nativePointerMethodId_ = + env->GetMethodID(local_jni_dict_cl, functionName.c_str(), sig.c_str()); + if (nativePointerMethodId_ == NULL) { + if (env->ExceptionOccurred()) + env->ExceptionDescribe(); + } + VELOX_CHECK(!env->ExceptionOccurred(), "Failed to find toDictVector method"); + return nativePointerMethodId_; +} + +std::string NativeClass::buildJNISignature( + const char* returnTypeSignature, + const char* argTypeSignature, + va_list args) { + std::string signature = "("; + const char* tempArgTypeSignature = argTypeSignature; + + while (tempArgTypeSignature != NULL) { + signature += tempArgTypeSignature; + tempArgTypeSignature = va_arg(args, const char*); + } + + signature += ")"; + signature += returnTypeSignature; + return signature; +} + +jlong NativeClass::poniter(jobject java_this) { + static jmethodID NATIVE_METHOD_HANDLE = + initPointMethod(NATIVE_CLASS_NAME, "nativePTR", "()J"); + JNIEnv* env = JniUtil::GetJNIEnv(); + return env->CallLongMethod(java_this, NATIVE_METHOD_HANDLE); +} + +void NativeClass::addNativeMethod( + const char* name, + void* funcPtr, + const char* returnTypeSignature, + const char* argTypeSignature, + ...) { + va_list args; + va_start(args, argTypeSignature); + std::string signature = + buildJNISignature(returnTypeSignature, argTypeSignature, args); + va_end(args); + JNINativeMethod method; + method.name = strdup(name); + method.signature = strdup(signature.c_str()); + method.fnPtr = funcPtr; + nativeMethods.push_back(method); +} + +jint NativeClass::registerNativeMethods() { + JNIEnv* env = JniUtil::GetJNIEnv(); + jclass clazz = env->FindClass(className_.c_str()); + if (clazz == NULL) { + return JNI_FALSE; + } + + if (env->RegisterNatives(clazz, &nativeMethods[0], nativeMethods.size()) < + 0) { + return JNI_FALSE; + } + + freeNativeMethods(); + return JNI_TRUE; +} + +void NativeClass::freeNativeMethods() { + for (auto& method : nativeMethods) { + free((void*)method.name); + free((void*)method.signature); + } + nativeMethods.clear(); +} + +void NativeClass::nativeRelease(JNIEnv* env, jobject obj) { + JNI_METHOD_START + static jmethodID NATIVE_METHOD_HANDLE = + initPointMethod(NATIVE_CLASS_NAME, "releaseHandle", "()V"); + JNIEnv* env = JniUtil::GetJNIEnv(); + delete reinterpret_cast(NativeClass::poniter(obj)); + env->CallLongMethod(obj, NATIVE_METHOD_HANDLE); + JNI_METHOD_END() +} +} // namespace facebook::velox::sdk::jni diff --git a/velox/sdk/cpp/native/NativeClass.hpp b/velox/sdk/cpp/native/NativeClass.hpp new file mode 100644 index 000000000000..d654c45c614c --- /dev/null +++ b/velox/sdk/cpp/native/NativeClass.hpp @@ -0,0 +1,120 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NATIVECLASS_HPP +#define NATIVECLASS_HPP +#include +#include "sdk/cpp/jni/JniErrors.h" + +#include +#include "sdk/cpp/jni/JniUtil.h" + +namespace facebook::velox::sdk::jni { +static std::string GetArrayTypeSignature(const char* baseTypeSignature) { + auto basic_string = std::string("[") + baseTypeSignature; + return basic_string; +} + +static std::string GetObjectTypeSignature(std::string baseTypeSignature) { + auto basic_string = std::string("L") + baseTypeSignature + std::string(";"); + return basic_string; +} + +// Java基本类型及其JNI签名 +static const char* kTypeBoolean = "Z"; +static const char* kTypeByte = "B"; +static const char* kTypeChar = "C"; +static const char* kTypeShort = "S"; +static const char* kTypeInt = "I"; +static const char* kTypeLong = "J"; +static const char* kTypeFloat = "F"; +static const char* kTypeDouble = "D"; +static const char* kTypeVoid = "V"; +static const char* kTypeString = "Ljava/lang/String;"; +static const char* kTypeBooleanArray = "[Z"; +static const char* kTypeByteArray = "[B"; +static const char* kTypeCharArray = "[C"; +static const char* kTypeShortArray = "[S"; +static const char* kTypeIntArray = "[I"; +static const char* kTypeLongArray = "[J"; +static const char* kTypeFloatArray = "[F"; +static const char* kTypeDoubleArray = "[D"; + +class NativeClass { + public: + NativeClass(std::string className); + + virtual void initInternal() = 0; + + void init(); + + static jmethodID initPointMethod( + std::string className, + std::string functionName, + std::string sig); + + template + static std::shared_ptr as(jobject java_this); + + static jlong poniter(jobject java_this); + + void addNativeMethod( + const char* name, + void* funcPtr, + const char* returnTypeSignature, + const char* argTypeSignature, + ...); + + jint registerNativeMethods(); + + void freeNativeMethods(); + + template + static void RegisterNatives(); + + static void nativeRelease(JNIEnv* env, jobject obj); + + private: + static std::string NATIVE_CLASS_NAME; + + static std::string buildJNISignature( + const char* returnTypeSignature, + const char* argTypeSignature, + va_list args); + + std::string className_; + + std::vector nativeMethods; +}; + +template +std::shared_ptr NativeClass::as(jobject java_this) { + jlong handle = poniter(java_this); + return reinterpret_cast(handle)->as(); +} + +template +void NativeClass::RegisterNatives() { + JNIEnv* env = JniUtil::GetJNIEnv(); + JNI_METHOD_START + std::make_shared()->init(); + JNI_METHOD_END(); +} + +} // namespace facebook::velox::sdk::jni + +#endif // NATIVECLASS_HPP diff --git a/velox/sdk/cpp/native/NativePlanBuilder.cc b/velox/sdk/cpp/native/NativePlanBuilder.cc new file mode 100644 index 000000000000..c1700287a6ec --- /dev/null +++ b/velox/sdk/cpp/native/NativePlanBuilder.cc @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include "sdk/cpp/jni/JniErrors.h" +#include "sdk/cpp/jni/JniUtil.h" + +#include +#include "NativePlanBuilder.hpp" + +#include +#include + +#include "sdk/cpp/expression/ExpressionUtils.hpp" + +namespace facebook::velox::sdk { + +using namespace facebook::velox::sdk::jni; +using namespace facebook::velox::sdk::expression; +using namespace facebook::velox::exec::test; + +std::string NativePlanBuilder::CLASS_NAME = "velox/jni/NativePlanBuilder"; + +void NativePlanBuilder::initInternal() { + addNativeMethod( + "nativeCreate", reinterpret_cast(nativeCreate), kTypeLong, NULL); + + addNativeMethod( + "nativeNodeId", reinterpret_cast(nativeNodeId), kTypeString, NULL); + + addNativeMethod( + "nativeBuilder", + reinterpret_cast(nativeBuilder), + kTypeString, + NULL); + + addNativeMethod( + "nativeProject", + reinterpret_cast(nativeProject), + kTypeVoid, + GetArrayTypeSignature(kTypeString).c_str(), + NULL); + + addNativeMethod( + "nativeJavaScan", + reinterpret_cast(nativeJavaScan), + kTypeVoid, + kTypeString, + NULL); + + addNativeMethod( + "nativeFilter", + reinterpret_cast(nativeFilter), + kTypeVoid, + kTypeString, + NULL); + + // addNativeMethod( + // "nativeAggregation", + // reinterpret_cast(nativeAggregation), + // kTypeVoid, + // kTypeString, + // GetArrayTypeSignature(kTypeString).c_str(), + // GetArrayTypeSignature(kTypeString).c_str(), + // GetArrayTypeSignature(kTypeString).c_str(), + // kTypeBoolean, + // NULL); + // + // addNativeMethod( + // "nativeShuffledHashJoin", + // reinterpret_cast(nativeShuffledHashJoin), + // kTypeVoid, + // kTypeString, + // kTypeBoolean, + // GetArrayTypeSignature(kTypeString).c_str(), + // GetArrayTypeSignature(kTypeString).c_str(), + // kTypeString, + // kTypeString, + // kTypeString, + // NULL); + // addNativeMethod( + // "nativeMergeJoin", + // reinterpret_cast(nativeMergeJoin), + // kTypeVoid, + // kTypeString, + // GetArrayTypeSignature(kTypeString).c_str(), + // GetArrayTypeSignature(kTypeString).c_str(), + // kTypeString, + // kTypeString, + // kTypeString, + // NULL); + // addNativeMethod( + // "nativeExpand", + // reinterpret_cast(nativeExpand), + // kTypeVoid, + // "[[Ljava/lang/String;", + // GetArrayTypeSignature(kTypeString).c_str(), + // NULL); + // addNativeMethod( + // "nativeTestString", + // reinterpret_cast(nativeTestString), + // kTypeVoid, + // kTypeString, + // NULL); + + // addNativeMethod( + // "nativeWindowFunction", + // reinterpret_cast(nativeWindowFunction), + // kTypeString, + // kTypeString, + // kTypeString, + // kTypeBoolean, + // NULL); + + // addNativeMethod( + // "nativeWindow", + // reinterpret_cast(nativeWindow), + // kTypeVoid, + // GetArrayTypeSignature(kTypeString).c_str(), + // GetArrayTypeSignature(kTypeString).c_str(), + // GetArrayTypeSignature(kTypeString).c_str(), + // GetArrayTypeSignature(kTypeString).c_str(), + // GetArrayTypeSignature(kTypeString).c_str(), + // kTypeBoolean, + // NULL); + // + // addNativeMethod( + // "nativeSort", + // reinterpret_cast(nativeSort), + // kTypeVoid, + // GetArrayTypeSignature(kTypeString).c_str(), + // GetArrayTypeSignature(kTypeString).c_str(), + // kTypeBoolean, + // NULL); + // + // addNativeMethod( + // "nativeUnnest", + // reinterpret_cast(nativeUnnest), + // kTypeVoid, + // GetArrayTypeSignature(kTypeString).c_str(), + // GetArrayTypeSignature(kTypeString).c_str(), + // GetArrayTypeSignature(kTypeString).c_str(), + // kTypeString, + // NULL); + + addNativeMethod( + "nativeLimit", + reinterpret_cast(nativeLimit), + kTypeVoid, + kTypeInt, + kTypeInt, + NULL); + // addNativeMethod( + // "nativePartitionedOutput", + // reinterpret_cast(nativePartitionedOutput), + // kTypeVoid, + // GetArrayTypeSignature(kTypeString).c_str(), + // kTypeInt, + // NULL); +} + +jlong NativePlanBuilder::nativeCreate(JNIEnv* env, jobject obj) { + JNI_METHOD_START + SharedPtrHandle* handle = new SharedPtrHandle{std::make_shared( + memory::MemoryManager::get()->planMemoryPool().get())}; + return reinterpret_cast(handle); + JNI_METHOD_END(-1) +} + +void NativePlanBuilder::nativeProject( + JNIEnv* env, + jobject obj, + jobjectArray projections) { + JNI_METHOD_START + const std::shared_ptr builder = as(obj); + builder->project(ConvertJStringArrayToVector(env, projections)); + JNI_METHOD_END() +} + +jstring +NativePlanBuilder::nativeJavaScan(JNIEnv* env, jobject obj, jstring schema) { + JNI_METHOD_START + const std::shared_ptr builder = as(obj); + const std::string schemaStr = jStringToCString(env, schema); + const RowTypePtr rowType = asRowType(ExprUtils::toVeloxType(schemaStr)); + builder->tableScan(rowType); + core::PlanNodeId scanNodeId; + builder->capturePlanNodeId(scanNodeId); + return env->NewStringUTF(scanNodeId.c_str()); + JNI_METHOD_END(nullptr) +} + +void NativePlanBuilder::nativeFilter(JNIEnv* env, jobject obj, jstring filter) { + JNI_METHOD_START + const std::string filterStr = jStringToCString(env, filter); + const std::shared_ptr builder = as(obj); + auto previousHook = core::Expressions::getResolverHook(); + parse::registerTypeResolver(); + builder->filter(filterStr); + core::Expressions::setTypeResolverHook(previousHook); + JNI_METHOD_END() +} +// +// void NativePlanBuilder::nativeExpand( +// JNIEnv* env, +// jobject obj, +// jobjectArray projects, +// jobjectArray alias) { +// JNI_METHOD_START +// const std::shared_ptr builder = as(obj); +// jsize arrayLength = env->GetArrayLength(projects); +// +// // 创建一个std::vector,预留足够的空间 +// std::vector result; +// result.reserve(arrayLength); +// +// std::vector> cProjects = {}; +// // 遍历Java字符串数组 +// for (jsize i = 0; i < arrayLength; i++) { +// // 获取Java字符串 +// jobjectArray arr = (jobjectArray)env->GetObjectArrayElement(projects, i); +// // 检查是否正常获取到Java字符串 +// if (arr) { +// std::vector fields = +// ExprUtils::asISerializableVector( +// env, arr); +// cProjects.push_back(fields); +// // 删除局部引用,防止内存泄漏 +// env->DeleteLocalRef(arr); +// } +// } +// std::vector cAlias = ConvertJStringArrayToVector(env, alias); +// builder->expand(cProjects, cAlias); +// JNI_METHOD_END() +// } +// +// void NativePlanBuilder::nativeShuffledHashJoin( +// JNIEnv* env, +// jobject obj, +// jstring joinType, +// jboolean nullAware, +// jobjectArray leftKeys, +// jobjectArray rightKeys, +// jstring condition, +// jstring plan, +// jstring output) { +// JNI_METHOD_START +// core::TypedExprPtr filter = nullptr; +// if (condition != NULL) { +// filter = ExprUtils::asISerializable( +// env, condition); +// } +// const std::shared_ptr builder = as(obj); +// std::shared_ptr planPtr = nullptr; +// if (plan) { +// std::string planJson = jStringToCString(env, plan); +// planPtr = ISerializable::deserialize( +// folly::parseJson(planJson, getSerializationOptions()), +// sdk::memory::MemoryManager::get()->planMemoryPool().get()); +// } +// builder->join( +// core::joinTypeFromName(jStringToCString(env, joinType)), +// nullAware, +// ExprUtils::asISerializableVector< +// core::FieldAccessTypedExpr>(env, leftKeys), +// ExprUtils::asISerializableVector< +// core::FieldAccessTypedExpr>(env, rightKeys), +// filter, +// planPtr, +// asRowType(ExprUtils::toVeloxType( +// jStringToCString(env, output)))); +// +// JNI_METHOD_END() +// } +// void NativePlanBuilder::nativeMergeJoin( +// JNIEnv* env, +// jobject obj, +// jstring joinType, +// jobjectArray leftKeys, +// jobjectArray rightKeys, +// jstring condition, +// jstring plan, +// jstring output) { +// JNI_METHOD_START +// core::TypedExprPtr filter = nullptr; +// if (condition != NULL) { +// filter = ExprUtils::asISerializable( +// env, condition); +// } +// const std::shared_ptr builder = as(obj); +// std::shared_ptr planPtr = nullptr; +// if (plan) { +// std::string planJson = jStringToCString(env, plan); +// planPtr = ISerializable::deserialize( +// folly::parseJson(planJson, getSerializationOptions()), +// sdk::memory::MemoryManager::get()->planMemoryPool().get()); +// } +// builder->mergeJoin( +// core::joinTypeFromName(jStringToCString(env, joinType)), +// ExprUtils::asISerializableVector< +// core::FieldAccessTypedExpr>(env, leftKeys), +// ExprUtils::asISerializableVector< +// core::FieldAccessTypedExpr>(env, rightKeys), +// filter, +// planPtr, +// asRowType(ExprUtils::toVeloxType( +// jStringToCString(env, output)))); +// JNI_METHOD_END() +// } +// +// void NativePlanBuilder::nativeAggregation( +// JNIEnv* env, +// jobject obj, +// jstring step, +// jobjectArray groupings, +// jobjectArray aggNames, +// jobjectArray aggs, +// jboolean ignoreNullKey) { +// JNI_METHOD_START +// const std::shared_ptr builder = as(obj); +// std::vector groupFields = +// ExprUtils::asISerializableVector< +// core::FieldAccessTypedExpr>(env, groupings); +// std::vector aggregates = {}; +// for (auto agg : ConvertJStringArrayToVector(env, aggs)) { +// aggregates.push_back(core::AggregationNode::Aggregate::deserialize( +// folly::parseJson(agg, getSerializationOptions()), +// sdk::memory::MemoryManager::get()->planMemoryPool().get())); +// } +// +// builder->aggregation( +// core::AggregationNode::stepFromName(jStringToCString(env, step)), +// groupFields, +// {}, +// ConvertJStringArrayToVector(env, aggNames), +// aggregates, +// ignoreNullKey); +// +// JNI_METHOD_END() +// } +jstring NativePlanBuilder::nativeWindowFunction( + JNIEnv* env, + jobject obj, + jstring functionCall, + jstring fram, + jboolean ignoreNullKey) { + JNI_METHOD_START + auto typePtr = ISerializable::deserialize( + folly::parseJson( + jStringToCString(env, functionCall), getSerializationOptions()), + sdk::memory::MemoryManager::get()->planMemoryPool().get()); + std::shared_ptr callExpr = + std::dynamic_pointer_cast(typePtr); + auto frame = core::WindowNode::Frame::deserialize( + folly::parseJson(jStringToCString(env, fram), getSerializationOptions())); + bool ignoreNull = ignoreNullKey; + return env->NewStringUTF( + folly::toJson( + core::WindowNode::Function{callExpr, frame, ignoreNull}.serialize()) + .c_str()); + JNI_METHOD_END(env->NewStringUTF("")) +} +// void NativePlanBuilder::nativeWindow( +// JNIEnv* env, +// jobject obj, +// jobjectArray partitionKeys, +// jobjectArray sortingKeys, +// jobjectArray sortingOrders, +// jobjectArray windowColumnNames, +// jobjectArray windowFunctions, +// jboolean ignoreNullKey) { +// JNI_METHOD_START +// const std::shared_ptr builder = as(obj); +// +// std::vector partitionsKeyExprs = +// ExprUtils::asISerializableVector< +// core::FieldAccessTypedExpr>(env, partitionKeys); +// std::vector sortingKeyExprs = +// ExprUtils::asISerializableVector< +// core::FieldAccessTypedExpr>(env, sortingKeys); +// +// std::vector sortingOrderExprs = +// ExprUtils::deserializeArray( +// env, sortingOrders); +// +// std::vector windowFunctionExprs = +// ExprUtils::deserializeArray< +// core::WindowNode::Function>(env, windowFunctions); +// +// bool cIgnoreNullKey = ignoreNullKey; +// builder->window( +// partitionsKeyExprs, +// sortingKeyExprs, +// sortingOrderExprs, +// std::move(ConvertJStringArrayToVector(env, windowColumnNames)), +// windowFunctionExprs, +// cIgnoreNullKey); +// JNI_METHOD_END() +// } +// +// void NativePlanBuilder::nativeSort( +// JNIEnv* env, +// jobject obj, +// jobjectArray sortingKeys, +// jobjectArray sortingOrders, +// jboolean isPartial) { +// JNI_METHOD_START +// const std::shared_ptr builder = as(obj); +// std::vector sortingKeyExprs = +// ExprUtils::asISerializableVector< +// core::FieldAccessTypedExpr>(env, sortingKeys); +// std::vector sortingOrderExprs = +// ExprUtils::deserializeArray( +// env, sortingOrders); +// builder->sort(sortingKeyExprs, sortingOrderExprs, isPartial); +// JNI_METHOD_END() +// } +// +// void NativePlanBuilder::nativeUnnest( +// JNIEnv* env, +// jobject obj, +// jobjectArray replicateVariables, +// jobjectArray unnestVariables, +// jobjectArray unnestNames, +// jstring ordinalityName) { +// JNI_METHOD_START +// const std::shared_ptr builder = as(obj); +// std::vector replicateVariablesExprs = +// ExprUtils::asISerializableVector< +// core::FieldAccessTypedExpr>(env, replicateVariables); +// std::vector unnestVariableExprs = +// ExprUtils::asISerializableVector< +// core::FieldAccessTypedExpr>(env, unnestVariables); +// std::vector cUnnestNames = +// ConvertJStringArrayToVector(env, unnestNames); +// std::optional ordinalColumn = std::nullopt; +// if (ordinalityName) { +// ordinalColumn = jStringToCString(env, ordinalityName); +// } +// builder->unnest( +// replicateVariablesExprs, +// unnestVariableExprs, +// cUnnestNames, +// ordinalColumn); +// JNI_METHOD_END() +// } + +void NativePlanBuilder::nativeLimit( + JNIEnv* env, + jobject obj, + jint offset, + jint limit) { + JNI_METHOD_START + const std::shared_ptr builder = as(obj); + builder->limit(offset, limit, false); + JNI_METHOD_END() +} +// +// void NativePlanBuilder::nativePartitionedOutput( +// JNIEnv* env, +// jobject obj, +// jobjectArray jKeys, +// jint numPartitions) { +// JNI_METHOD_START +// std::vector keys = +// ExprUtils::asISerializableVector( +// env, jKeys); +// const std::shared_ptr builder = as(obj); +// builder->partitionedOutput(keys, numPartitions); +// JNI_METHOD_END() +// } + +jstring NativePlanBuilder::nativeNodeId(JNIEnv* env, jobject obj) { + JNI_METHOD_START + const std::shared_ptr builder = as(obj); + return env->NewStringUTF(builder->planNode()->id().c_str()); + JNI_METHOD_END(nullptr) +} + +jstring NativePlanBuilder::nativeBuilder(JNIEnv* env, jobject obj) { + JNI_METHOD_START + const std::shared_ptr builder = as(obj); + return env->NewStringUTF( + utils::JsonUtils::toSortedJson(builder->planNode()->serialize()).c_str()); + JNI_METHOD_END(env->NewStringUTF("ERROR")) +} + +void NativePlanBuilder::nativeTestString( + JNIEnv* env, + jobject obj, + jstring str) { + JNI_METHOD_START + std::string s = jStringToCString(env, str); + std::cout << "String " << s.length() << std::endl; + JNI_METHOD_END() +} + +} // namespace facebook::velox::sdk \ No newline at end of file diff --git a/velox/sdk/cpp/native/NativePlanBuilder.hpp b/velox/sdk/cpp/native/NativePlanBuilder.hpp new file mode 100644 index 000000000000..ddaaa548883b --- /dev/null +++ b/velox/sdk/cpp/native/NativePlanBuilder.hpp @@ -0,0 +1,126 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NATIVEPLANBUILDER_HPP +#define NATIVEPLANBUILDER_HPP +#include + +#include "NativeClass.hpp" + +namespace facebook::velox::sdk { +class NativePlanBuilder : public jni::NativeClass { + static std::string CLASS_NAME; + + public: + NativePlanBuilder() : NativeClass(CLASS_NAME) {} + + void initInternal() override; + + static jlong nativeCreate(JNIEnv* env, jobject obj); + + static void + nativeProject(JNIEnv* env, jobject obj, jobjectArray projections); + + static jstring nativeJavaScan(JNIEnv* env, jobject obj, jstring schema); + + static void nativeFilter(JNIEnv* env, jobject obj, jstring filter); + // + // static void nativeExpand( + // JNIEnv* env, + // jobject obj, + // jobjectArray projects, + // jobjectArray alias); + // + // static void nativeShuffledHashJoin( + // JNIEnv* env, + // jobject obj, + // jstring joinType, + // jboolean nullAware, + // jobjectArray leftKeys, + // jobjectArray rightKeys, + // jstring condition, + // jstring plan, + // jstring output); + // + // static void nativeMergeJoin( + // JNIEnv* env, + // jobject obj, + // jstring joinType, + // jobjectArray leftKeys, + // jobjectArray rightKeys, + // jstring condition, + // jstring plan, + // jstring output); + // + // static void nativeAggregation( + // JNIEnv* env, + // jobject obj, + // jstring step, + // jobjectArray groupings, + // jobjectArray aggNames, + // jobjectArray aggs, + // jboolean ignoreNullKey); + + static jstring nativeWindowFunction( + JNIEnv* env, + jobject obj, + jstring functionCall, + jstring fram, + jboolean ignoreNullKey); + // + // static void nativeWindow( + // JNIEnv* env, + // jobject obj, + // jobjectArray partitionKeys, + // jobjectArray sortingKeys, + // jobjectArray sortingOrders, + // jobjectArray windowColumnNames, + // jobjectArray windowFunctions, + // jboolean ignoreNullKey); + // + // static void nativeSort( + // JNIEnv* env, + // jobject obj, + // jobjectArray sortingKeys, + // jobjectArray sortingOrders, + // jboolean isPartial); + // + // static void nativeUnnest( + // JNIEnv* env, + // jobject obj, + // jobjectArray replicateVariables, + // jobjectArray unnestVariables, + // jobjectArray unnestNames, + // jstring ordinalityName); + + static void nativeLimit(JNIEnv* env, jobject obj, jint offset, jint limit); + // static void nativePartitionedOutput( + // JNIEnv* env, + // jobject obj, + // jobjectArray jKeys, + // jint partitions); + + static jstring nativeNodeId(JNIEnv* env, jobject obj); + + static jstring nativeBuilder(JNIEnv* env, jobject obj); + + static void nativeTestString(JNIEnv* env, jobject obj, jstring str); + +}; +} // namespace facebook::velox::sdk + +#endif // NATIVEPLANBUILDER_HPP diff --git a/velox/sdk/cpp/utils/JsonUtils.h b/velox/sdk/cpp/utils/JsonUtils.h new file mode 100644 index 000000000000..28913724ad22 --- /dev/null +++ b/velox/sdk/cpp/utils/JsonUtils.h @@ -0,0 +1,37 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef JSONUTILS_H +#define JSONUTILS_H +#include "folly/json/json.h" +namespace facebook::velox::sdk::utils { +class JsonUtils { + public: + static const folly::json::serialization_opts& getOpts() { + static const folly::json::serialization_opts opts_ = []() { + folly::json::serialization_opts opts; + opts.sort_keys = true; + return opts; + }(); + return opts_; + } + inline static std::string toSortedJson(folly::dynamic dynamic) { + return serialize(dynamic, getOpts()); + } +}; +} // namespace facebook::velox::sdk::utils +#endif // JSONUTILS_H diff --git a/velox/sdk/java/pom.xml b/velox/sdk/java/pom.xml new file mode 100644 index 000000000000..74aba446d9e3 --- /dev/null +++ b/velox/sdk/java/pom.xml @@ -0,0 +1,96 @@ + + + 4.0.0 + velox + velox-sdk + 0.0.1 + velox java sdk + pom + + 2.12.18 + 1.8 + 3.3.1 + 2.12 + 3.2.9 + 5.6.2 + 2.2.0 + + + + velox-sdk-core + + + + + + org.scala-lang + scala-library + ${scala.version} + + + + org.scala-lang + scala-compiler + ${scala.version} + + + + org.scalatest + scalatest_2.12 + ${scalatest_2.12.version} + + + org.scalatestplus + scalacheck-1-15_${scala.binary.version} + 3.3.0.0-SNAP3 + test + + + org.scalatestplus + mockito-4-2_${scala.binary.version} + 3.2.11.0 + test + + + org.scalatestplus + selenium-3-141_${scala.binary.version} + 3.3.0.0-SNAP3 + test + + + org.apache.spark + spark-catalyst_2.12 + ${spark.version} + + + org.junit.jupiter + junit-jupiter + ${junit.jupiter.version} + + + + + + + + + maven-source-plugin + 2.1 + + true + + + + compile + + jar + + + + + + + + diff --git a/velox/sdk/java/velox-sdk-core/checkstyle-suppressions.xml b/velox/sdk/java/velox-sdk-core/checkstyle-suppressions.xml new file mode 100644 index 000000000000..804a178a5fe2 --- /dev/null +++ b/velox/sdk/java/velox-sdk-core/checkstyle-suppressions.xml @@ -0,0 +1,51 @@ + + + + + + + + + + + + + + + + + + diff --git a/velox/sdk/java/velox-sdk-core/checkstyle.xml b/velox/sdk/java/velox-sdk-core/checkstyle.xml new file mode 100644 index 000000000000..72c10f210f28 --- /dev/null +++ b/velox/sdk/java/velox-sdk-core/checkstyle.xml @@ -0,0 +1,200 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/velox/sdk/java/velox-sdk-core/pom.xml b/velox/sdk/java/velox-sdk-core/pom.xml new file mode 100644 index 000000000000..bb8921cae1e0 --- /dev/null +++ b/velox/sdk/java/velox-sdk-core/pom.xml @@ -0,0 +1,205 @@ + + + + velox + velox-sdk + 0.0.1 + ../pom.xml + + 4.0.0 + + velox-sdk-core + + + 8 + 8 + + + + + + + org.apache.spark + spark-catalyst_2.12 + ${spark.version} + + + + org.apache.spark + spark-catalyst_2.12 + ${spark.version} + test-jar + test + + + + org.scala-lang + scala-library + + + org.scala-lang + scala-compiler + + + org.scalatest + scalatest_2.12 + + + + org.junit.jupiter + junit-jupiter + test + + + + org.scalatestplus + scalacheck-1-15_${scala.binary.version} + test + + + org.scalatestplus + selenium-3-141_${scala.binary.version} + test + + + + + + + ${project.basedir}/../../../../_build/velox/sdk + + libsdk.dylib + libsdk.so + + + + + + ${project.basedir}/../../../../_build/velox/sdk + + libsdk.dylib + libsdk.so + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 3.0.0 + + + validate + + run + + + + SDK directory: ${project.basedir}/../../_build/velox/sdk + + Resolved SDK directory: ${sdk.dir} + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.1 + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + add-source + testCompile + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.2.0 + + + + test-jar + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.2.1 + + + attach-sources + + jar + + + + + attach-test-sources + + test-jar + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.22.2 + + always + 1 + 10 + false + -Dfile.encoding=UTF-8 + + **/*Test.* + **/*ColumnarExistenceJoinSuite.* + **/Test*.* + + + + + org.scalatest + scalatest-maven-plugin + ${scalatest-maven-plugin.version} + + false + ${project.build.directory}/surefire-reports + . + WDF TestSuite.txt + + F + + + + test + + test + + + + + + + diff --git a/velox/sdk/java/velox-sdk-core/scalastyle-config.xml b/velox/sdk/java/velox-sdk-core/scalastyle-config.xml new file mode 100644 index 000000000000..deef8f421e22 --- /dev/null +++ b/velox/sdk/java/velox-sdk-core/scalastyle-config.xml @@ -0,0 +1,433 @@ + + + + + Scalastyle standard configuration + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + true + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ARROW, EQUALS, ELSE, TRY, CATCH, FINALLY, LARROW, RARROW + + + + + + ARROW, EQUALS, COMMA, COLON, IF, ELSE, DO, WHILE, FOR, MATCH, TRY, CATCH, FINALLY, LARROW, RARROW + + + + + + + + + ^FunSuite[A-Za-z]*$ + Tests must extend org.apache.spark.SparkFunSuite instead. + + + + + ^println$ + + + + + spark(.sqlContext)?.sparkContext.hadoopConfiguration + + + + + @VisibleForTesting + + + + + Runtime\.getRuntime\.addShutdownHook + + + + + mutable\.SynchronizedBuffer + + + + + Class\.forName + + + + + Await\.result + + + + + Await\.ready + + + + + (\.toUpperCase|\.toLowerCase)(?!(\(|\(Locale.ROOT\))) + + + + + throw new \w+Error\( + + + + + + JavaConversions + Instead of importing implicits in scala.collection.JavaConversions._, import + scala.collection.JavaConverters._ and use .asScala / .asJava methods + + + + org\.apache\.commons\.lang\. + Use Commons Lang 3 classes (package org.apache.commons.lang3.*) instead + of Commons Lang 2 (package org.apache.commons.lang.*) + + + + scala\.concurrent\.ExecutionContext\.Implicits\.global + User queries can use global thread pool, causing starvation and eventual OOM. + Thus, Spark-internal APIs should not use this thread pool + + + + FileSystem.get\([a-zA-Z_$][a-zA-Z_$0-9]*\) + + + + + extractOpt + Use jsonOption(x).map(.extract[T]) instead of .extractOpt[T], as the latter + is slower. + + + + + java,scala,3rdParty,spark + javax?\..* + scala\..* + (?!org\.apache\.spark\.).* + org\.apache\.spark\..* + + + + + + COMMA + + + + + + \)\{ + + + + + (?m)^(\s*)/[*][*].*$(\r|)\n^\1 [*] + Use Javadoc style indentation for multiline comments + + + + case[^\n>]*=>\s*\{ + Omit braces in case clauses. + + + + new (java\.lang\.)?(Byte|Integer|Long|Short)\( + Use static factory 'valueOf' or 'parseXXX' instead of the deprecated constructors. + + + + + + + + + + + + + + + Please use Apache Log4j 2 instead. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + 800> + + + + + 30 + + + + + 10 + + + + + 50 + + + + + + + + + + + -1,0,1,2,3 + + + + Objects.toStringHelper + Avoid using Object.toStringHelper. Use ToStringBuilder instead. + + diff --git a/velox/sdk/java/velox-sdk-core/src/main/java/velox/jni/NativeClass.java b/velox/sdk/java/velox-sdk-core/src/main/java/velox/jni/NativeClass.java new file mode 100644 index 000000000000..6cf6cdfa8fc3 --- /dev/null +++ b/velox/sdk/java/velox-sdk-core/src/main/java/velox/jni/NativeClass.java @@ -0,0 +1,35 @@ +package velox.jni; + +public abstract class NativeClass implements AutoCloseable { + long handle = 0; + + public void setHandle(long handle) { + this.handle = handle; + this.isRelease = false; + } + + boolean isRelease = true; + + public long nativePTR() { + return this.handle; + } + + public void releaseHandle() { + this.handle = 0; + } + + protected abstract void releaseInternal(); + + public final void Release() { + releaseInternal(); + } + + @Override + public synchronized void close() { + if (!isRelease) { + Release(); + isRelease = true; + } + } + +} diff --git a/velox/sdk/java/velox-sdk-core/src/main/java/velox/jni/NativePlanBuilder.java b/velox/sdk/java/velox-sdk-core/src/main/java/velox/jni/NativePlanBuilder.java new file mode 100644 index 000000000000..018966bfd8f1 --- /dev/null +++ b/velox/sdk/java/velox-sdk-core/src/main/java/velox/jni/NativePlanBuilder.java @@ -0,0 +1,161 @@ +package velox.jni; + +import org.apache.spark.sql.types.StructType; + +public class NativePlanBuilder extends NativeClass { + public NativePlanBuilder() { + setHandle(nativeCreate()); + } + + private native void nativeJavaScan(String schema); + + private native void nativeFilter(String condition); + + private native void nativeProject(String[] projections); + + + private native String nativeBuilder(); + + protected native long nativeCreate(); + + private native void nativeRelease(); + + private native String nativeNodeId(); + + // +// +// public native void nativeTestString(String test); +// +// +// private native String nativeWindowFunction(String functionCallJson, +// String frameJson, +// boolean ignoreNulls); +// +// native void nativeWindow(String[] partitionKeys, +// String[] sortingKeys, +// String[] sortingOrders, +// String[] windowColumnNames, +// String[] windowFunctions, +// boolean inputsSorted); +// +// native void nativeSort( +// String[] sortingKeys, +// String[] sortingOrders, +// boolean isPartial); + + private native void nativeUnnest(String[] replicateVariables, + String[] unnestVariables, + String[] unnestNames, + String ordinalityName); + + private native void nativeLimit(int offset, int limit); + + private native void nativePartitionedOutput(String[] offset, int numPartitions); + + + @Override + protected void releaseInternal() { + nativeRelease(); + } + + + // for native call , don't delete + public NativePlanBuilder project(String[] projections) { + nativeProject(projections); + return this; + } + + public NativePlanBuilder scan(StructType schema) { + nativeJavaScan(schema.catalogString()); + return this; + } + + public NativePlanBuilder filter(String condition) { + nativeFilter(condition); + return this; + } + + public NativePlanBuilder limit(int offset, int limit) { + nativeLimit(offset, limit); + return this; + } + +// +// public NativePlanBuilder expand(String[][] project, String[] alias) { +// nativeExpand(project, alias); +// return this; +// } +// +// public NativePlanBuilder hashJoin(String joinType, boolean nullAware, String[] leftKeys, String[] rightKeys, String filter, String rightPlan, String output) { +// nativeShuffledHashJoin(joinType, nullAware, leftKeys, rightKeys, filter, rightPlan, output); +// return this; +// } +// +// public NativePlanBuilder MergeJoin(String joinType, String[] leftKeys, String[] rightKeys, String filter, String rightPlan, String output) { +// nativeMergeJoin(joinType, leftKeys, rightKeys, filter, rightPlan, output); +// return this; +// } +// +// public void aggregate(String step, +// String[] group, +// String[] aggNames, +// String[] agg, +// boolean ignoreNullKey) { +// +// nativeAggregation(step, group, aggNames, agg, ignoreNullKey); +// } +// +// +// public String windowFunction(String functionCallJson, +// String frameJson, +// boolean ignoreNulls) { +// return nativeWindowFunction(functionCallJson, frameJson, ignoreNulls); +// } +// +// +// public void window(String[] partitionKeys, +// String[] sortingKeys, +// SortOrder[] sortingOrders, +// String[] windowColumnNames, +// String[] windowFunctions, +// boolean inputsSorted) { +// String[] sortingOrders1 = PlanUtils.jsonChildren(sortingOrders); +// nativeWindow(partitionKeys, sortingKeys, sortingOrders1, windowColumnNames, windowFunctions, inputsSorted); +// } +// +// public void sort(String[] sortingKeys, +// SortOrder[] sortingOrders, +// boolean isPartial) { +// String[] sortingOrders1 = PlanUtils.jsonChildren(sortingOrders); +// nativeSort(sortingKeys, sortingOrders1, isPartial); +// } + + public void unnest(String[] replicateVariables, + String[] unnestVariables, + String[] unnestNames, + String ordinalityName) { + nativeUnnest(replicateVariables, unnestVariables, unnestNames, ordinalityName); + } + + + public void partitionedOutput(String[] keys, int numPartitions) { + nativePartitionedOutput(keys, numPartitions); + } + + + public String builderAndRelease() { + String s = nativeBuilder(); + close(); + return s; + } + + public String nodeId() { + return nativeNodeId(); + } + + public String builder() { + return nativeBuilder(); + } + + +} diff --git a/velox/sdk/java/velox-sdk-core/src/main/java/velox/jni/VeloxNative.java b/velox/sdk/java/velox-sdk-core/src/main/java/velox/jni/VeloxNative.java new file mode 100644 index 000000000000..dbeecfc3a8a4 --- /dev/null +++ b/velox/sdk/java/velox-sdk-core/src/main/java/velox/jni/VeloxNative.java @@ -0,0 +1,66 @@ +package velox.jni; + +import velox.utils.NativeLibUtil; + +public class VeloxNative { + private static volatile boolean isLoaded = false; + + static final String osName = System.getProperty("os.name").toLowerCase(); + + static final String osArch; + protected static final boolean isMacOs; + protected static final boolean isLinuxOs; + protected static final boolean isX86_64; + protected static final boolean isAarch64; + + static { + isMacOs = osName.startsWith("mac"); + isLinuxOs = osName.startsWith("linux"); + osArch = System.getProperty("os.arch").toLowerCase(); + isX86_64 = osArch.startsWith("amd64") || osArch.startsWith("x86_64"); + isAarch64 = osArch.startsWith("aarch64"); + } + + public static String getFileExtension() { + return isMacOs ? ".dylib" : ".so"; + } + + public static void init(String conf) { + // Early exit if already loaded + if (isLoaded) { + return; + } + + synchronized (VeloxNative.class) { + // Double-checked locking pattern + if (isLoaded) { + return; + } + + boolean loaded = false; + try { + String libraryName = "libsdk" + getFileExtension(); + loaded = tryLoadLibrary(libraryName, conf); + } catch (RuntimeException e) { + throw new IllegalStateException("Failed to load native libraries.", e); + } + if (!loaded) { + throw new IllegalStateException("Native libraries could not be loaded."); + } + isLoaded = true; + } + } + + private static boolean tryLoadLibrary(String libraryName, String conf) { + try { + NativeLibUtil.loadLibrary(libraryName); + if (conf != null && !conf.isEmpty()) { + NativeLibUtil.init(conf); + } + return true; + } catch (RuntimeException e) { + // Log or handle the exception if needed + return false; + } + } +} diff --git a/velox/sdk/java/velox-sdk-core/src/main/java/velox/utils/NativeLibUtil.java b/velox/sdk/java/velox-sdk-core/src/main/java/velox/utils/NativeLibUtil.java new file mode 100644 index 000000000000..4409ca2a41a9 --- /dev/null +++ b/velox/sdk/java/velox-sdk-core/src/main/java/velox/utils/NativeLibUtil.java @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package velox.utils; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sparkproject.guava.base.Joiner; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.jar.JarEntry; +import java.util.jar.JarFile; + +public class NativeLibUtil { + private final static Logger LOG = LoggerFactory.getLogger(NativeLibUtil.class); + + public static void loadLibrary(String libFileName) { + // dev model + Optional libFileOptional = findLibraryInClasspath(libFileName); + if (libFileOptional.isPresent()) { + System.load(libFileOptional.get().getAbsolutePath()); + return; + } + // Try to load library from classpath + boolean loaded = false; + try { + // Find the path to the JAR file that contains the class + String jarPath = NativeLibUtil.class.getProtectionDomain() + .getCodeSource().getLocation().toURI().getPath(); + + // Check if the JAR file contains the library + if (jarPath.endsWith("velox-sdk-core.jar")) { + // Create a temporary directory to extract the .so file + Path tempDir = Files.createTempDirectory("nativeLibs"); + + try (JarFile jar = new JarFile(jarPath)) { + JarEntry entry = jar.getJarEntry(libFileName); + if (entry != null) { + // Extract the .so file to the temporary directory + File libFile = new File(tempDir.toFile(), libFileName); + try (InputStream is = jar.getInputStream(entry); + OutputStream os = new FileOutputStream(libFile)) { + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + os.write(buffer, 0, bytesRead); + } + } + // Load the .so file + System.load(libFile.getAbsolutePath()); + loaded = true; + } + } + } + } catch (Exception e) { + LOG.warn("Unable to extract and load " + libFileName, e); + } + // If not loaded, fallback to original loading logic + if (!loaded) { + List candidates = new ArrayList<>(Arrays.asList( + System.getProperty("java.library.path").split(":"))); + // Fall back to automatically finding the library in test environments. + // This makes it easier to run tests from Eclipse without specially configuring + // the Run Configurations. + try { + String myPath = NativeLibUtil.class.getProtectionDomain() + .getCodeSource().getLocation().getPath(); + if (myPath.toString().endsWith("sdk-core/target/classes/") || + myPath.toString().endsWith("sdk-core/target/eclipse-classes/")) { + candidates.add(myPath + "../../../../../../_build/velox/sdk"); + } + } catch (Exception e) { + LOG.warn("Unable to get path for NativeLibUtil class", e); + } + + for (String path : candidates) { + File libFile = new File(path + File.separator + libFileName); + if (libFile.exists()) { + System.load(libFile.getPath()); + return; + } + } + + throw new RuntimeException("Failed to load " + libFileName + " from any " + + "candidate location:\n" + Joiner.on("\n").join(candidates)); + } + } + + private static Optional findLibraryInClasspath(String libFileName) { + try { + URL resource = Thread.currentThread().getContextClassLoader().getResource(libFileName); + if (resource != null) { + File libFile = new File(resource.getFile()); + return Optional.of(libFile); + } + return Optional.empty(); + } catch (Exception e) { + return Optional.empty(); + } + } + + + native public static void init(String conf); +} diff --git a/velox/sdk/java/velox-sdk-core/src/test/java/velox/jni/NativePlanBuilderTest.java b/velox/sdk/java/velox-sdk-core/src/test/java/velox/jni/NativePlanBuilderTest.java new file mode 100644 index 000000000000..f7ea29719414 --- /dev/null +++ b/velox/sdk/java/velox-sdk-core/src/test/java/velox/jni/NativePlanBuilderTest.java @@ -0,0 +1,47 @@ +package velox.jni; + +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class NativePlanBuilderTest extends NativeTest { + + + @Test + public void testScan() { + try (NativePlanBuilder scan = new NativePlanBuilder() + .scan(StructType.fromDDL("a int, B int"));) { + String builder = scan + .builder(); + Assertions.assertEquals("{\"assignments\":[{\"assign\":\"a\",\"columnHandle\":{\"columnType\":\"Regular\",\"dataType\":{\"name\":\"Type\",\"type\":\"INTEGER\"},\"hiveColumnHandleName\":\"a\",\"hiveType\":{\"name\":\"Type\",\"type\":\"INTEGER\"},\"name\":\"HiveColumnHandle\",\"requiredSubfields\":[]}},{\"assign\":\"B\",\"columnHandle\":{\"columnType\":\"Regular\",\"dataType\":{\"name\":\"Type\",\"type\":\"INTEGER\"},\"hiveColumnHandleName\":\"B\",\"hiveType\":{\"name\":\"Type\",\"type\":\"INTEGER\"},\"name\":\"HiveColumnHandle\",\"requiredSubfields\":[]}}],\"id\":\"0\",\"name\":\"TableScanNode\",\"outputType\":{\"cTypes\":[{\"name\":\"Type\",\"type\":\"INTEGER\"},{\"name\":\"Type\",\"type\":\"INTEGER\"}],\"name\":\"Type\",\"names\":[\"a\",\"B\"],\"type\":\"ROW\"},\"tableHandle\":{\"connectorId\":\"test-hive\",\"filterPushdownEnabled\":true,\"name\":\"HiveTableHandle\",\"subfieldFilters\":[],\"tableName\":\"hive_table\"}}", builder); + } + } + + + @Test + public void testScanFilter() { + try (NativePlanBuilder scan = new NativePlanBuilder() + .scan(StructType.fromDDL("a int, B int"));) { + + scan.filter("a > 10"); + String builder = scan + .builder(); + Assertions.assertEquals("{\"filter\":{\"functionName\":\"gt\",\"inputs\":[{\"inputs\":[{\"fieldName\":\"a\",\"inputs\":[{\"name\":\"InputTypedExpr\",\"type\":{\"cTypes\":[{\"name\":\"Type\",\"type\":\"INTEGER\"},{\"name\":\"Type\",\"type\":\"INTEGER\"}],\"name\":\"Type\",\"names\":[\"a\",\"B\"],\"type\":\"ROW\"}}],\"name\":\"FieldAccessTypedExpr\",\"type\":{\"name\":\"Type\",\"type\":\"INTEGER\"}}],\"name\":\"CastTypedExpr\",\"nullOnFailure\":false,\"type\":{\"name\":\"Type\",\"type\":\"BIGINT\"}},{\"name\":\"ConstantTypedExpr\",\"type\":{\"name\":\"Type\",\"type\":\"BIGINT\"},\"value\":{\"type\":\"BIGINT\",\"value\":10}}],\"name\":\"CallTypedExpr\",\"type\":{\"name\":\"Type\",\"type\":\"BOOLEAN\"}},\"id\":\"1\",\"name\":\"FilterNode\",\"sources\":[{\"assignments\":[{\"assign\":\"a\",\"columnHandle\":{\"columnType\":\"Regular\",\"dataType\":{\"name\":\"Type\",\"type\":\"INTEGER\"},\"hiveColumnHandleName\":\"a\",\"hiveType\":{\"name\":\"Type\",\"type\":\"INTEGER\"},\"name\":\"HiveColumnHandle\",\"requiredSubfields\":[]}},{\"assign\":\"B\",\"columnHandle\":{\"columnType\":\"Regular\",\"dataType\":{\"name\":\"Type\",\"type\":\"INTEGER\"},\"hiveColumnHandleName\":\"B\",\"hiveType\":{\"name\":\"Type\",\"type\":\"INTEGER\"},\"name\":\"HiveColumnHandle\",\"requiredSubfields\":[]}}],\"id\":\"0\",\"name\":\"TableScanNode\",\"outputType\":{\"cTypes\":[{\"name\":\"Type\",\"type\":\"INTEGER\"},{\"name\":\"Type\",\"type\":\"INTEGER\"}],\"name\":\"Type\",\"names\":[\"a\",\"B\"],\"type\":\"ROW\"},\"tableHandle\":{\"connectorId\":\"test-hive\",\"filterPushdownEnabled\":true,\"name\":\"HiveTableHandle\",\"subfieldFilters\":[],\"tableName\":\"hive_table\"}}]}", builder); + } + } + + + @Test + public void testScanProject() { + try (NativePlanBuilder scan = new NativePlanBuilder() + .scan(StructType.fromDDL("a int, B int"));) { + + String[] projections = {"a"}; + scan.project(projections); + String builder = scan + .builder(); + Assertions.assertEquals("{\"id\":\"1\",\"name\":\"ProjectNode\",\"names\":[\"a\"],\"projections\":[{\"fieldName\":\"a\",\"inputs\":[{\"name\":\"InputTypedExpr\",\"type\":{\"cTypes\":[{\"name\":\"Type\",\"type\":\"INTEGER\"},{\"name\":\"Type\",\"type\":\"INTEGER\"}],\"name\":\"Type\",\"names\":[\"a\",\"B\"],\"type\":\"ROW\"}}],\"name\":\"FieldAccessTypedExpr\",\"type\":{\"name\":\"Type\",\"type\":\"INTEGER\"}}],\"sources\":[{\"assignments\":[{\"assign\":\"a\",\"columnHandle\":{\"columnType\":\"Regular\",\"dataType\":{\"name\":\"Type\",\"type\":\"INTEGER\"},\"hiveColumnHandleName\":\"a\",\"hiveType\":{\"name\":\"Type\",\"type\":\"INTEGER\"},\"name\":\"HiveColumnHandle\",\"requiredSubfields\":[]}},{\"assign\":\"B\",\"columnHandle\":{\"columnType\":\"Regular\",\"dataType\":{\"name\":\"Type\",\"type\":\"INTEGER\"},\"hiveColumnHandleName\":\"B\",\"hiveType\":{\"name\":\"Type\",\"type\":\"INTEGER\"},\"name\":\"HiveColumnHandle\",\"requiredSubfields\":[]}}],\"id\":\"0\",\"name\":\"TableScanNode\",\"outputType\":{\"cTypes\":[{\"name\":\"Type\",\"type\":\"INTEGER\"},{\"name\":\"Type\",\"type\":\"INTEGER\"}],\"name\":\"Type\",\"names\":[\"a\",\"B\"],\"type\":\"ROW\"},\"tableHandle\":{\"connectorId\":\"test-hive\",\"filterPushdownEnabled\":true,\"name\":\"HiveTableHandle\",\"subfieldFilters\":[],\"tableName\":\"hive_table\"}}]}", builder); + } + } + +} \ No newline at end of file diff --git a/velox/sdk/java/velox-sdk-core/src/test/java/velox/jni/NativeTest.java b/velox/sdk/java/velox-sdk-core/src/test/java/velox/jni/NativeTest.java new file mode 100644 index 000000000000..e3ca9ab134a3 --- /dev/null +++ b/velox/sdk/java/velox-sdk-core/src/test/java/velox/jni/NativeTest.java @@ -0,0 +1,11 @@ +package velox.jni; + +import org.junit.jupiter.api.BeforeAll; + +abstract class NativeTest { + + @BeforeAll + public static void init(){ + VeloxNative.init(null); + } +} diff --git a/velox/sdk/java/velox-sdk-core/src/test/java/velox/jni/VeloxNativeTest.java b/velox/sdk/java/velox-sdk-core/src/test/java/velox/jni/VeloxNativeTest.java new file mode 100644 index 000000000000..cbde65d95783 --- /dev/null +++ b/velox/sdk/java/velox-sdk-core/src/test/java/velox/jni/VeloxNativeTest.java @@ -0,0 +1,12 @@ +package velox.jni; + +import org.junit.jupiter.api.Test; + +public class VeloxNativeTest { + + @Test + public void init(){ + VeloxNative.init(""); + } + +} \ No newline at end of file diff --git a/velox/sdk/java/velox-sdk-core/src/test/java/velox/utils/NativeLibUtilTest.java b/velox/sdk/java/velox-sdk-core/src/test/java/velox/utils/NativeLibUtilTest.java new file mode 100644 index 000000000000..a5ef84008ad2 --- /dev/null +++ b/velox/sdk/java/velox-sdk-core/src/test/java/velox/utils/NativeLibUtilTest.java @@ -0,0 +1,14 @@ +package velox.utils; + + +import org.junit.jupiter.api.Test; + +public class NativeLibUtilTest { + + + @Test + public void testLoadLib() { + NativeLibUtil.loadLibrary("libsdk.dylib"); + } + +} \ No newline at end of file