From db0ef22dd68ae00e11f09da40b6734c1d9770b57 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Thu, 20 Sep 2018 16:56:50 -0400 Subject: [PATCH] ARROW-3146: [C++] Prototype Flight RPC client and server implementations This is a partial C++ implementation of the Flight RPC system initially proposed by Jacques in ARROW-249. As in Java, I had to dig into gRPC and Protocol Buffers internals to ensure that * On write, memory is only copied once into the outgoing gRPC buffer * On read, no memory is copied The way that I tricked gRPC into circumventing the built-in protobuf serde paths might look a bit hacky, but after digging around in the library a bunch I've convinced myself that it's the best and perhaps only way to accomplish this. Luckily, the message that's being serialized/deserialized is pretty opaque to the rest of the gRPC system, and it's controlled by the `SerializationTraits` class. So you can take a gRPC stream reader and make it create any kind of type you want, even if the input data is a protocol buffer. Some things that won't be addressed in this patch, as scope is too large: * gRPC build toolchain issues (this is rather complex, I will create follow-up issues) * Security / encryption, and authentication issues. I have only implemented an insecure server * Integration with Travis CI * Python bindings API is preliminary and I expect to be the subject of iteration to make general and fast over the next several months. Author: Wes McKinney Closes #2547 from wesm/flight-cpp-prototype and squashes the following commits: 64bcdea43 Initial Arrow Flight C++ implementation --- ci/travis_before_script_cpp.sh | 4 + ci/travis_script_cpp.sh | 2 +- cpp/CMakeLists.txt | 36 +- cpp/build-support/lint_cpp_cli.py | 6 +- cpp/cmake_modules/FindProtobuf.cmake | 1 + cpp/cmake_modules/ThirdpartyToolchain.cmake | 116 ++--- cpp/src/arrow/buffer.cc | 4 + cpp/src/arrow/buffer.h | 5 + .../dbi/hiveserver2/thrift/CMakeLists.txt | 9 +- cpp/src/arrow/flight/CMakeLists.txt | 139 ++++++ cpp/src/arrow/flight/README.md | 36 ++ cpp/src/arrow/flight/api.h | 22 + cpp/src/arrow/flight/client.cc | 410 ++++++++++++++++++ cpp/src/arrow/flight/client.h | 110 +++++ cpp/src/arrow/flight/flight-benchmark.cc | 193 +++++++++ cpp/src/arrow/flight/flight-test.cc | 267 ++++++++++++ cpp/src/arrow/flight/internal.cc | 235 ++++++++++ cpp/src/arrow/flight/internal.h | 77 ++++ cpp/src/arrow/flight/perf-server.cc | 200 +++++++++ cpp/src/arrow/flight/perf.proto | 44 ++ cpp/src/arrow/flight/server.cc | 385 ++++++++++++++++ cpp/src/arrow/flight/server.h | 142 ++++++ cpp/src/arrow/flight/test-server.cc | 141 ++++++ cpp/src/arrow/flight/test-util.h | 157 +++++++ cpp/src/arrow/flight/types.cc | 75 ++++ cpp/src/arrow/flight/types.h | 210 +++++++++ cpp/src/arrow/gpu/cuda-test.cc | 2 +- cpp/src/arrow/ipc/message.cc | 4 + cpp/src/arrow/ipc/message.h | 4 + cpp/src/arrow/ipc/metadata-internal.cc | 2 + cpp/src/arrow/ipc/metadata-internal.h | 2 +- cpp/src/arrow/ipc/reader.cc | 6 + cpp/src/arrow/ipc/test-common.h | 41 +- cpp/src/arrow/ipc/writer.cc | 206 +++++---- cpp/src/arrow/ipc/writer.h | 30 ++ cpp/src/arrow/test-util.h | 90 +++- cpp/src/arrow/util/memory.h | 4 +- cpp/src/{parquet => arrow}/util/stopwatch.h | 27 +- cpp/src/parquet/util/CMakeLists.txt | 1 - cpp/thirdparty/versions.txt | 4 +- format/Flight.proto | 299 +++++++++++++ integration/README.md | 2 +- python/.gitignore | 1 - python/README.md | 4 +- 44 files changed, 3520 insertions(+), 235 deletions(-) create mode 100644 cpp/src/arrow/flight/CMakeLists.txt create mode 100644 cpp/src/arrow/flight/README.md create mode 100644 cpp/src/arrow/flight/api.h create mode 100644 cpp/src/arrow/flight/client.cc create mode 100644 cpp/src/arrow/flight/client.h create mode 100644 cpp/src/arrow/flight/flight-benchmark.cc create mode 100644 cpp/src/arrow/flight/flight-test.cc create mode 100644 cpp/src/arrow/flight/internal.cc create mode 100644 cpp/src/arrow/flight/internal.h create mode 100644 cpp/src/arrow/flight/perf-server.cc create mode 100644 cpp/src/arrow/flight/perf.proto create mode 100644 cpp/src/arrow/flight/server.cc create mode 100644 cpp/src/arrow/flight/server.h create mode 100644 cpp/src/arrow/flight/test-server.cc create mode 100644 cpp/src/arrow/flight/test-util.h create mode 100644 cpp/src/arrow/flight/types.cc create mode 100644 cpp/src/arrow/flight/types.h rename cpp/src/{parquet => arrow}/util/stopwatch.h (71%) create mode 100644 format/Flight.proto diff --git a/ci/travis_before_script_cpp.sh b/ci/travis_before_script_cpp.sh index e1c231ce4d89e..54a00f7ff1a7c 100755 --- a/ci/travis_before_script_cpp.sh +++ b/ci/travis_before_script_cpp.sh @@ -94,6 +94,10 @@ if [ $ARROW_TRAVIS_COVERAGE == "1" ]; then CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_GENERATE_COVERAGE=ON" fi +if [ $ARROW_TRAVIS_VERBOSE == "1" ]; then + CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_VERBOSE_THIRDPARTY_BUILD=ON" +fi + if [ $TRAVIS_OS_NAME == "linux" ]; then cmake $CMAKE_COMMON_FLAGS \ $CMAKE_LINUX_FLAGS \ diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh index 3a6b2f780e94f..b89e5b73bf00f 100755 --- a/ci/travis_script_cpp.sh +++ b/ci/travis_script_cpp.sh @@ -23,7 +23,7 @@ source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh pushd $CPP_BUILD_DIR -ctest -j2 --output-on-failure -L unittest +PATH=$ARROW_BUILD_TYPE:$PATH ctest -j2 --output-on-failure -L unittest popd diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index aa68f92f75de9..23ef7d0547c33 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -140,6 +140,10 @@ Pass multiple labels by dividing with semicolons") "Compile with extra error context (line numbers, code)" OFF) + option(ARROW_FLIGHT + "Build the Arrow Flight RPC System (requires GRPC, Protocol Buffers)" + OFF) + option(ARROW_IPC "Build the Arrow IPC extensions" ON) @@ -240,10 +244,6 @@ Pass multiple labels by dividing with semicolons") "Build with zstd compression" ON) - option(ARROW_WITH_GRPC - "Build with GRPC" - OFF) - option(ARROW_GENERATE_COVERAGE "Build with C++ code coverage enabled" OFF) @@ -634,14 +634,6 @@ if (ARROW_WITH_ZSTD) SET(ARROW_STATIC_LINK_LIBS zstd_static ${ARROW_STATIC_LINK_LIBS}) endif() -if (ARROW_WITH_GRPC) - SET(ARROW_STATIC_LINK_LIBS - grpc_grp - grpc_grpc - grpc_grpcpp - ${ARROW_STATIC_LINK_LIBS}) -endif() - if (ARROW_ORC) SET(ARROW_STATIC_LINK_LIBS ${ARROW_STATIC_LINK_LIBS} @@ -657,11 +649,13 @@ if (ARROW_STATIC_LINK_LIBS) add_dependencies(arrow_dependencies ${ARROW_STATIC_LINK_LIBS}) endif() -set(ARROW_BENCHMARK_LINK_LIBS - arrow_static - arrow_benchmark_main - gtest - ${ARROW_STATIC_LINK_LIBS}) +if (ARROW_BUILD_BENCHMARKS) + set(ARROW_BENCHMARK_LINK_LIBS + arrow_static + arrow_benchmark_main + gtest + ${ARROW_STATIC_LINK_LIBS}) +endif() set(ARROW_SHARED_PRIVATE_LINK_LIBS ${ARROW_STATIC_LINK_LIBS} @@ -683,8 +677,8 @@ endif() set(ARROW_MIN_TEST_LIBS arrow_static ${ARROW_STATIC_LINK_LIBS} - gtest - gtest_main) + gtest_main + gtest) if(NOT MSVC) set(ARROW_MIN_TEST_LIBS @@ -743,6 +737,10 @@ endif() add_subdirectory(src/arrow) +if(ARROW_FLIGHT) + add_subdirectory(src/arrow/flight) +endif() + if(ARROW_PYTHON) add_subdirectory(src/arrow/python) endif() diff --git a/cpp/build-support/lint_cpp_cli.py b/cpp/build-support/lint_cpp_cli.py index 0c6bad1af9f29..40281055673f7 100644 --- a/cpp/build-support/lint_cpp_cli.py +++ b/cpp/build-support/lint_cpp_cli.py @@ -72,10 +72,8 @@ def lint_file(path): 'arrow/util/macros.h', 'arrow/python/iterators.h', 'arrow/util/parallel.h', - 'arrow/io/hdfs-internal.h', - 'parquet/arrow/test-util.h', - 'parquet/encoding-internal.h', - 'parquet/test-util.h' + 'test', + 'internal' ] try: diff --git a/cpp/cmake_modules/FindProtobuf.cmake b/cpp/cmake_modules/FindProtobuf.cmake index 9591bd1eb70c0..cb003e372ba91 100644 --- a/cpp/cmake_modules/FindProtobuf.cmake +++ b/cpp/cmake_modules/FindProtobuf.cmake @@ -94,6 +94,7 @@ else() endif() mark_as_advanced ( + PROTOBUF_EXECUTABLE PROTOBUF_INCLUDE_DIR PROTOBUF_LIBS PROTOBUF_STATIC_LIB diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index 4bd64701aefab..e25f954d29621 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -31,6 +31,7 @@ if (NOT "$ENV{ARROW_BUILD_TOOLCHAIN}" STREQUAL "") set(GTEST_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") endif() set(JEMALLOC_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") + set(GRPC_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") set(LZ4_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") # orc disabled as it's not in conda-forge (but in Anaconda with an incompatible ABI) # set(ORC_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}") @@ -210,7 +211,7 @@ if (DEFINED ENV{ARROW_PROTOBUF_URL}) set(PROTOBUF_SOURCE_URL "$ENV{ARROW_PROTOBUF_URL}") else() string(SUBSTRING ${PROTOBUF_VERSION} 1 -1 STRIPPED_PROTOBUF_VERSION) # strip the leading `v` - set(PROTOBUF_SOURCE_URL "https://github.com/google/protobuf/releases/download/${PROTOBUF_VERSION}/protobuf-${STRIPPED_PROTOBUF_VERSION}.tar.gz") + set(PROTOBUF_SOURCE_URL "https://github.com/protocolbuffers/protobuf/releases/download/${PROTOBUF_VERSION}/protobuf-all-${STRIPPED_PROTOBUF_VERSION}.tar.gz") endif() set(RAPIDJSON_SOURCE_MD5 "badd12c511e081fec6c89c43a7027bce") @@ -1009,54 +1010,11 @@ if (ARROW_WITH_ZSTD) endif() endif() -if (ARROW_WITH_GRPC) -# ---------------------------------------------------------------------- -# GRPC - if ("${GRPC_HOME}" STREQUAL "") - set(GRPC_VENDORED 1) - set(GRPC_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep-prefix/src/grpc_ep-build") - set(GRPC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep/src/grpc_ep-install") - set(GRPC_HOME "${GRPC_PREFIX}") - set(GRPC_INCLUDE_DIR "${GRPC_PREFIX}/include") - set(GRPC_STATIC_LIBRARY_GPR "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}gpr${CMAKE_STATIC_LIBRARY_SUFFIX}") - set(GRPC_STATIC_LIBRARY_GRPC "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpc${CMAKE_STATIC_LIBRARY_SUFFIX}") - set(GRPC_STATIC_LIBRARY_GRPCPP "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpc++${CMAKE_STATIC_LIBRARY_SUFFIX}") - set(GRPC_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} - "-DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS}" - "-DCMAKE_C_FLAGS=${EP_C_FLAGS}" - -DCMAKE_INSTALL_PREFIX=${GRPC_PREFIX} - -DBUILD_SHARED_LIBS=OFF) - ExternalProject_Add(grpc_ep - GIT_REPOSITORY "https://github.com/grpc/grpc" - GIT_TAG ${GRPC_VERSION} - BUILD_BYPRODUCTS "${GRPC_STATIC_LIBRARY_GPR}" "${GRPC_STATIC_LIBRARY_GRPC}" "${GRPC_STATIC_LIBRARY_GRPCPP}" - ${GRPC_BUILD_BYPRODUCTS} - ${EP_LOG_OPTIONS} - CMAKE_ARGS ${GRPC_CMAKE_ARGS} - ${EP_LOG_OPTIONS}) - else() - find_package(gRPC CONFIG REQUIRED) - set(GRPC_VENDORED 0) - endif() - - include_directories(SYSTEM ${GRPC_INCLUDE_DIR}) - ADD_THIRDPARTY_LIB(grpc_grp - STATIC_LIB ${GRPC_STATIC_LIBRARY_GPR}) - ADD_THIRDPARTY_LIB(grpc_grpc - STATIC_LIB ${GRPC_STATIC_LIBRARY_GRPC}) - ADD_THIRDPARTY_LIB(grpc_grpcpp - STATIC_LIB ${GRPC_STATIC_LIBRARY_GRPCPP}) - - if (GRPC_VENDORED) - add_dependencies(grpc_grp grpc_ep) - add_dependencies(grpc_grpc grpc_ep) - add_dependencies(grpc_grpcpp grpc_ep) - endif() - -endif() +# ---------------------------------------------------------------------- +# Protocol Buffers (required for ORC and Flight libraries) -if (ARROW_ORC) +if (ARROW_ORC OR ARROW_FLIGHT) # protobuf if ("${PROTOBUF_HOME}" STREQUAL "") set (PROTOBUF_PREFIX "${THIRDPARTY_DIR}/protobuf_ep-install") @@ -1089,9 +1047,69 @@ if (ARROW_ORC) if (PROTOBUF_VENDORED) add_dependencies (protobuf protobuf_ep) endif () +endif() - # orc +# ---------------------------------------------------------------------- +# Dependencies for Arrow Flight RPC + +if (ARROW_FLIGHT) + if ("${GRPC_HOME}" STREQUAL "") + set(GRPC_VENDORED 1) + set(GRPC_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep-prefix/src/grpc_ep-build") + set(GRPC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep/src/grpc_ep-install") + set(GRPC_HOME "${GRPC_PREFIX}") + set(GRPC_INCLUDE_DIR "${GRPC_PREFIX}/include") + set(GRPC_STATIC_LIBRARY_GPR "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}gpr${CMAKE_STATIC_LIBRARY_SUFFIX}") + set(GRPC_STATIC_LIBRARY_GRPC "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpc${CMAKE_STATIC_LIBRARY_SUFFIX}") + set(GRPC_STATIC_LIBRARY_GRPCPP "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpcpp${CMAKE_STATIC_LIBRARY_SUFFIX}") + set(GRPC_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} + "-DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS}" + "-DCMAKE_C_FLAGS=${EP_C_FLAGS}" + -DCMAKE_INSTALL_PREFIX=${GRPC_PREFIX} + -DBUILD_SHARED_LIBS=OFF) + ExternalProject_Add(grpc_ep + GIT_REPOSITORY "https://github.com/grpc/grpc" + GIT_TAG ${GRPC_VERSION} + BUILD_BYPRODUCTS "${GRPC_STATIC_LIBRARY_GPR}" "${GRPC_STATIC_LIBRARY_GRPC}" "${GRPC_STATIC_LIBRARY_GRPCPP}" + ${GRPC_BUILD_BYPRODUCTS} + ${EP_LOG_OPTIONS} + CMAKE_ARGS ${GRPC_CMAKE_ARGS} + ${EP_LOG_OPTIONS}) + include_directories(SYSTEM ${GRPC_INCLUDE_DIR}) + else() + find_package(gRPC CONFIG REQUIRED) + set(GRPC_VENDORED 0) + endif() + + get_property(GPR_STATIC_LIB TARGET gRPC::gpr PROPERTY LOCATION) + ADD_THIRDPARTY_LIB(grpc_gpr + STATIC_LIB ${GPR_STATIC_LIB}) + + get_property(GRPC_STATIC_LIB TARGET gRPC::grpc_unsecure PROPERTY LOCATION) + ADD_THIRDPARTY_LIB(grpc_grpc + STATIC_LIB ${GRPC_STATIC_LIB}) + + get_property(GRPCPP_STATIC_LIB TARGET gRPC::grpc++_unsecure PROPERTY LOCATION) + ADD_THIRDPARTY_LIB(grpc_grpcpp + STATIC_LIB ${GRPCPP_STATIC_LIB}) + + get_property(GRPC_ADDRESS_SORTING_STATIC_LIB + TARGET gRPC::address_sorting PROPERTY LOCATION) + ADD_THIRDPARTY_LIB(grpc_address_sorting + STATIC_LIB ${GRPC_ADDRESS_SORTING_STATIC_LIB}) + + # XXX(wesm): relying on vendored c-ares provided by gRPC for the time being + get_property(CARES_STATIC_LIB TARGET c-ares::cares_static PROPERTY LOCATION) + ADD_THIRDPARTY_LIB(cares + STATIC_LIB ${CARES_STATIC_LIB}) +endif() + +# ---------------------------------------------------------------------- +# Apache ORC + +if (ARROW_ORC) + # orc if ("${ORC_HOME}" STREQUAL "") set(ORC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/orc_ep-install") set(ORC_HOME "${ORC_PREFIX}") @@ -1274,10 +1292,10 @@ endif() endif() # ARROW_HIVESERVER2 -if (ARROW_USE_GLOG) # ---------------------------------------------------------------------- # GLOG +if (ARROW_USE_GLOG) if("${GLOG_HOME}" STREQUAL "") set(GLOG_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/glog_ep-prefix/src/glog_ep") set(GLOG_INCLUDE_DIR "${GLOG_BUILD_DIR}/include") diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc index 2c0104170c4f6..006fe0f903fc6 100644 --- a/cpp/src/arrow/buffer.cc +++ b/cpp/src/arrow/buffer.cc @@ -71,6 +71,10 @@ Status Buffer::FromString(const std::string& data, std::shared_ptr* out) return FromString(data, default_memory_pool(), out); } +std::string Buffer::ToString() const { + return std::string(reinterpret_cast(data_), static_cast(size_)); +} + void Buffer::CheckMutable() const { DCHECK(is_mutable()) << "buffer not mutable"; } /// A Buffer whose lifetime is tied to a particular MemoryPool diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index f8c2c83135ef5..42b99bfce27b1 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -146,6 +146,11 @@ class ARROW_EXPORT Buffer { static_cast(sizeof(T) * data.size())); } // namespace arrow + /// \brief Copy buffer contents into a new std::string + /// \return std::string + /// \note Can throw std::bad_alloc if buffer is large + std::string ToString() const; + int64_t capacity() const { return capacity_; } const uint8_t* data() const { return data_; } diff --git a/cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt b/cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt index c59fd5a0494c2..be689f935c95c 100644 --- a/cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt +++ b/cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt @@ -44,10 +44,11 @@ function(HS2_THRIFT_GEN VAR) # All the output files we can determine based on filename. # - Does not include .skeleton.cpp files # - Does not include java output files - set(OUTPUT_BE_FILE "${GEN_DIR}/${FIL_WE}_types.cpp") - set(OUTPUT_BE_FILE ${OUTPUT_BE_FILE} " ${GEN_DIR}/${FIL_WE}_types.h") - set(OUTPUT_BE_FILE ${OUTPUT_BE_FILE} " ${GEN_DIR}/${FIL_WE}_constants.cpp") - set(OUTPUT_BE_FILE ${OUTPUT_BE_FILE} " ${GEN_DIR}/${FIL_WE}_constants.h") + set(OUTPUT_BE_FILE + "${GEN_DIR}/${FIL_WE}_types.cpp" + "${GEN_DIR}/${FIL_WE}_types.h" + "${GEN_DIR}/${FIL_WE}_constants.cpp" + "${GEN_DIR}/${FIL_WE}_constants.h") list(APPEND ${VAR} ${OUTPUT_BE_FILE}) # BeeswaxService thrift generation diff --git a/cpp/src/arrow/flight/CMakeLists.txt b/cpp/src/arrow/flight/CMakeLists.txt new file mode 100644 index 0000000000000..e830be3062af2 --- /dev/null +++ b/cpp/src/arrow/flight/CMakeLists.txt @@ -0,0 +1,139 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +add_custom_target(arrow_flight) + +# Header files +install(FILES + api.h + client.h + server.h + types.h + DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/flight") + +SET(ARROW_FLIGHT_STATIC_LINK_LIBS + grpc_grpcpp + grpc_grpc + grpc_gpr + grpc_address_sorting + cares) + +# TODO(wesm): Protobuf shared vs static linking + +set(FLIGHT_PROTO_PATH "${CMAKE_SOURCE_DIR}/../format") +set(FLIGHT_PROTO ${CMAKE_SOURCE_DIR}/../format/Flight.proto) + +set(FLIGHT_GENERATED_PROTO_FILES + "${CMAKE_CURRENT_BINARY_DIR}/Flight.pb.cc" + "${CMAKE_CURRENT_BINARY_DIR}/Flight.pb.h" + "${CMAKE_CURRENT_BINARY_DIR}/Flight.grpc.pb.cc" + "${CMAKE_CURRENT_BINARY_DIR}/Flight.grpc.pb.h") + +if(PROTOBUF_VENDORED) + set(PROTO_DEPENDS ${FLIGHT_PROTO} protobuf) +else() + set(PROTO_DEPENDS ${FLIGHT_PROTO}) +endif() + +# Get location of grpc_cpp_plugin so we can pass it to protoc +get_property(GRPC_CPP_PLUGIN TARGET gRPC::grpc_cpp_plugin PROPERTY LOCATION) + +add_custom_command( + OUTPUT ${FLIGHT_GENERATED_PROTO_FILES} + COMMAND ${PROTOBUF_EXECUTABLE} + "-I${FLIGHT_PROTO_PATH}" + "--cpp_out=${CMAKE_CURRENT_BINARY_DIR}" + "${FLIGHT_PROTO}" + DEPENDS ${PROTO_DEPENDS} + ARGS + COMMAND ${PROTOBUF_EXECUTABLE} + "-I${FLIGHT_PROTO_PATH}" + "--grpc_out=${CMAKE_CURRENT_BINARY_DIR}" + "--plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}" + "${FLIGHT_PROTO}") + +set_source_files_properties(${FLIGHT_GENERATED_PROTO_FILES} + PROPERTIES GENERATED TRUE) + +set(ARROW_FLIGHT_SRCS + client.cc + Flight.pb.cc + Flight.grpc.pb.cc + internal.cc + server.cc + types.cc +) + +ADD_ARROW_LIB(arrow_flight + SOURCES ${ARROW_FLIGHT_SRCS} + DEPENDENCIES arrow_dependencies + SHARED_LINK_LIBS arrow_shared ${ARROW_FLIGHT_STATIC_LINK_LIBS} + STATIC_LINK_LIBS arrow_static ${ARROW_FLIGHT_STATIC_LINK_LIBS}) + +ADD_ARROW_TEST(flight-test + EXTRA_LINK_LIBS arrow_flight_static ${ARROW_FLIGHT_STATIC_LINK_LIBS} + LABELS "arrow_flight") + +# Build test server for unit tests or benchmarks +if (ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS) + add_executable(flight-test-server test-server.cc) + target_link_libraries(flight-test-server + arrow_flight_static + ${ARROW_FLIGHT_STATIC_LINK_LIBS} + gflags + gtest) + + # This is needed for the unit tests + if (ARROW_BUILD_TESTS) + add_dependencies(flight-test flight-test-server) + endif() +endif() + +if (ARROW_BUILD_BENCHMARKS) + # Perf server for benchmarks + set(PERF_PROTO_GENERATED_FILES + "${CMAKE_CURRENT_BINARY_DIR}/perf.pb.cc" + "${CMAKE_CURRENT_BINARY_DIR}/perf.pb.h") + + add_custom_command( + OUTPUT ${PERF_PROTO_GENERATED_FILES} + COMMAND ${PROTOBUF_EXECUTABLE} + "-I${CMAKE_CURRENT_SOURCE_DIR}" + "--cpp_out=${CMAKE_CURRENT_BINARY_DIR}" + "perf.proto" + DEPENDS ${PROTO_DEPENDS}) + + add_executable(flight-perf-server + perf-server.cc + perf.pb.cc) + target_link_libraries(flight-perf-server + arrow_flight_static + ${ARROW_FLIGHT_STATIC_LINK_LIBS} + gflags + gtest) + + add_executable(flight-benchmark + flight-benchmark.cc + perf.pb.cc) + target_link_libraries(flight-benchmark + arrow_flight_static + ${ARROW_FLIGHT_STATIC_LINK_LIBS} + gflags + gtest) + + add_dependencies(flight-benchmark flight-perf-server) +endif(ARROW_BUILD_BENCHMARKS) diff --git a/cpp/src/arrow/flight/README.md b/cpp/src/arrow/flight/README.md new file mode 100644 index 0000000000000..5156973acf33c --- /dev/null +++ b/cpp/src/arrow/flight/README.md @@ -0,0 +1,36 @@ + + +# Arrow Flight RPC System for C++ + +## Development notes + +The gRPC protobuf plugin requires that libprotoc is in your +`LD_LIBRARY_PATH`. Until we figure out a general solution, you may need to do: + +``` +export LD_LIBRARY_PATH=$PROTOBUF_HOME/lib:$LD_LIBRARY_PATH +``` + +Currently, to run the unit tests, the directory of executables must either be +your current working directory or you need to add it to your path, e.g. + +``` +PATH=debug:$PATH debug/flight-test +``` \ No newline at end of file diff --git a/cpp/src/arrow/flight/api.h b/cpp/src/arrow/flight/api.h new file mode 100644 index 0000000000000..98b08404e541e --- /dev/null +++ b/cpp/src/arrow/flight/api.h @@ -0,0 +1,22 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/flight/client.h" +#include "arrow/flight/server.h" +#include "arrow/flight/types.h" diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc new file mode 100644 index 0000000000000..94c4928d0220d --- /dev/null +++ b/cpp/src/arrow/flight/client.cc @@ -0,0 +1,410 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/flight/client.h" + +#include +#include +#include +#include + +#include "google/protobuf/io/coded_stream.h" +#include "google/protobuf/wire_format_lite.h" +#include "grpc/byte_buffer_reader.h" +#include "grpcpp/grpcpp.h" + +#include "arrow/ipc/reader.h" +#include "arrow/record_batch.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/util/logging.h" + +#include "arrow/flight/Flight.grpc.pb.h" +#include "arrow/flight/Flight.pb.h" +#include "arrow/flight/internal.h" + +namespace pb = arrow::flight::protocol; + +namespace arrow { +namespace flight { + +/// Internal, not user-visible type used for memory-efficient reads from gRPC +/// stream +struct FlightData { + /// Used only for puts, may be null + std::unique_ptr descriptor; + + /// Non-length-prefixed Message header as described in format/Message.fbs + std::shared_ptr metadata; + + /// Message body + std::shared_ptr body; +}; + +} // namespace flight +} // namespace arrow + +namespace grpc { + +// Customizations to gRPC for more efficient deserialization of FlightData + +using google::protobuf::internal::WireFormatLite; +using google::protobuf::io::CodedInputStream; + +using arrow::flight::FlightData; + +bool ReadBytesZeroCopy(const std::shared_ptr& source_data, + CodedInputStream* input, std::shared_ptr* out) { + uint32_t length; + if (!input->ReadVarint32(&length)) { + return false; + } + *out = arrow::SliceBuffer(source_data, input->CurrentPosition(), + static_cast(length)); + return input->Skip(static_cast(length)); +} + +// Internal wrapper for gRPC ByteBuffer so its memory can be exposed to Arrow +// consumers with zero-copy +class GrpcBuffer : public arrow::MutableBuffer { + public: + GrpcBuffer(grpc_slice slice, bool incref) + : MutableBuffer(GRPC_SLICE_START_PTR(slice), + static_cast(GRPC_SLICE_LENGTH(slice))), + slice_(incref ? grpc_slice_ref(slice) : slice) {} + + ~GrpcBuffer() override { + // Decref slice + grpc_slice_unref(slice_); + } + + static arrow::Status Wrap(ByteBuffer* cpp_buf, std::shared_ptr* out) { + // These types are guaranteed by static assertions in gRPC to have the same + // in-memory representation + + auto buffer = *reinterpret_cast(cpp_buf); + + // This part below is based on the Flatbuffers gRPC SerializationTraits in + // flatbuffers/grpc.h + + // Check if this is a single uncompressed slice. + if ((buffer->type == GRPC_BB_RAW) && + (buffer->data.raw.compression == GRPC_COMPRESS_NONE) && + (buffer->data.raw.slice_buffer.count == 1)) { + // If it is, then we can reference the `grpc_slice` directly. + grpc_slice slice = buffer->data.raw.slice_buffer.slices[0]; + + // Increment reference count so this memory remains valid + *out = std::make_shared(slice, true); + } else { + // Otherwise, we need to use `grpc_byte_buffer_reader_readall` to read + // `buffer` into a single contiguous `grpc_slice`. The gRPC reader gives + // us back a new slice with the refcount already incremented. + grpc_byte_buffer_reader reader; + if (!grpc_byte_buffer_reader_init(&reader, buffer)) { + return arrow::Status::IOError("Internal gRPC error reading from ByteBuffer"); + } + grpc_slice slice = grpc_byte_buffer_reader_readall(&reader); + grpc_byte_buffer_reader_destroy(&reader); + + // Steal the slice reference + *out = std::make_shared(slice, false); + } + + return arrow::Status::OK(); + } + + private: + grpc_slice slice_; +}; + +// Read internal::FlightData from grpc::ByteBuffer containing FlightData +// protobuf without copying +template <> +class SerializationTraits { + public: + static Status Serialize(const FlightData& msg, ByteBuffer** buffer, bool* own_buffer) { + return Status(StatusCode::UNIMPLEMENTED, + "internal::FlightData serialization not implemented"); + } + + static Status Deserialize(ByteBuffer* buffer, FlightData* out) { + if (!buffer) { + return Status(StatusCode::INTERNAL, "No payload"); + } + + std::shared_ptr wrapped_buffer; + GRPC_RETURN_NOT_OK(GrpcBuffer::Wrap(buffer, &wrapped_buffer)); + + auto buffer_length = static_cast(wrapped_buffer->size()); + CodedInputStream pb_stream(wrapped_buffer->data(), buffer_length); + + // TODO(wesm): The 2-parameter version of this function is deprecated + pb_stream.SetTotalBytesLimit(buffer_length, -1 /* no threshold */); + + // This is the bytes remaining when using CodedInputStream like this + while (pb_stream.BytesUntilTotalBytesLimit()) { + const uint32_t tag = pb_stream.ReadTag(); + const int field_number = WireFormatLite::GetTagFieldNumber(tag); + switch (field_number) { + case pb::FlightData::kFlightDescriptorFieldNumber: { + pb::FlightDescriptor pb_descriptor; + if (!pb_descriptor.ParseFromCodedStream(&pb_stream)) { + return Status(StatusCode::INTERNAL, "Unable to parse FlightDescriptor"); + } + } break; + case pb::FlightData::kDataHeaderFieldNumber: { + if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->metadata)) { + return Status(StatusCode::INTERNAL, "Unable to read FlightData metadata"); + } + } break; + case pb::FlightData::kDataBodyFieldNumber: { + if (!ReadBytesZeroCopy(wrapped_buffer, &pb_stream, &out->body)) { + return Status(StatusCode::INTERNAL, "Unable to read FlightData body"); + } + } break; + default: + DCHECK(false) << "cannot happen"; + } + } + buffer->Clear(); + + // TODO(wesm): Where and when should we verify that the FlightData is not + // malformed or missing components? + + return Status::OK; + } +}; + +} // namespace grpc + +namespace arrow { +namespace flight { + +struct ClientRpc { + grpc::ClientContext context; + + ClientRpc() { + /// XXX workaround until we have a handshake in Connect + context.set_wait_for_ready(true); + } +}; + +class FlightStreamReader : public RecordBatchReader { + public: + FlightStreamReader(std::unique_ptr rpc, + const std::shared_ptr& schema, + std::unique_ptr> stream) + : rpc_(std::move(rpc)), + stream_finished_(false), + schema_(schema), + stream_(std::move(stream)) {} + + std::shared_ptr schema() const override { return schema_; } + + Status ReadNext(std::shared_ptr* out) override { + FlightData data; + + if (stream_finished_) { + *out = nullptr; + return Status::OK(); + } + + // For customizing read path for better memory/serialization efficiency + auto custom_reader = reinterpret_cast*>(stream_.get()); + + if (custom_reader->Read(&data)) { + std::unique_ptr message; + + // Validate IPC message + RETURN_NOT_OK(ipc::Message::Open(data.metadata, data.body, &message)); + return ipc::ReadRecordBatch(*message, schema_, out); + } else { + // Stream is completed + stream_finished_ = true; + *out = nullptr; + return internal::FromGrpcStatus(stream_->Finish()); + } + } + + private: + // The RPC context lifetime must be coupled to the ClientReader + std::unique_ptr rpc_; + + bool stream_finished_; + std::shared_ptr schema_; + std::unique_ptr> stream_; +}; + +class FlightClient::FlightClientImpl { + public: + Status Connect(const std::string& host, int port) { + // TODO(wesm): Support other kinds of GRPC ChannelCredentials + std::stringstream ss; + ss << host << ":" << port; + std::string uri = ss.str(); + + stub_ = pb::FlightService::NewStub( + grpc::CreateChannel(ss.str(), grpc::InsecureChannelCredentials())); + return Status::OK(); + } + + Status ListFlights(const Criteria& criteria, std::unique_ptr* listing) { + // TODO(wesm): populate criteria + pb::Criteria pb_criteria; + + ClientRpc rpc; + std::unique_ptr> stream( + stub_->ListFlights(&rpc.context, pb_criteria)); + + std::vector flights; + + pb::FlightGetInfo pb_info; + FlightInfo::Data info_data; + while (stream->Read(&pb_info)) { + RETURN_NOT_OK(internal::FromProto(pb_info, &info_data)); + flights.emplace_back(FlightInfo(std::move(info_data))); + } + + listing->reset(new SimpleFlightListing(flights)); + return internal::FromGrpcStatus(stream->Finish()); + } + + Status DoAction(const Action& action, std::unique_ptr* results) { + pb::Action pb_action; + RETURN_NOT_OK(internal::ToProto(action, &pb_action)); + + ClientRpc rpc; + std::unique_ptr> stream( + stub_->DoAction(&rpc.context, pb_action)); + + pb::Result pb_result; + + std::vector materialized_results; + while (stream->Read(&pb_result)) { + Result result; + RETURN_NOT_OK(internal::FromProto(pb_result, &result)); + materialized_results.emplace_back(std::move(result)); + } + + *results = std::unique_ptr( + new SimpleResultStream(std::move(materialized_results))); + return internal::FromGrpcStatus(stream->Finish()); + } + + Status ListActions(std::vector* types) { + pb::Empty empty; + + ClientRpc rpc; + std::unique_ptr> stream( + stub_->ListActions(&rpc.context, empty)); + + pb::ActionType pb_type; + ActionType type; + while (stream->Read(&pb_type)) { + RETURN_NOT_OK(internal::FromProto(pb_type, &type)); + types->emplace_back(std::move(type)); + } + return internal::FromGrpcStatus(stream->Finish()); + } + + Status GetFlightInfo(const FlightDescriptor& descriptor, + std::unique_ptr* info) { + pb::FlightDescriptor pb_descriptor; + pb::FlightGetInfo pb_response; + + RETURN_NOT_OK(internal::ToProto(descriptor, &pb_descriptor)); + + ClientRpc rpc; + Status s = internal::FromGrpcStatus( + stub_->GetFlightInfo(&rpc.context, pb_descriptor, &pb_response)); + RETURN_NOT_OK(s); + + FlightInfo::Data info_data; + RETURN_NOT_OK(internal::FromProto(pb_response, &info_data)); + info->reset(new FlightInfo(std::move(info_data))); + return Status::OK(); + } + + Status DoGet(const Ticket& ticket, const std::shared_ptr& schema, + std::unique_ptr* out) { + pb::Ticket pb_ticket; + internal::ToProto(ticket, &pb_ticket); + + // ClientRpc rpc; + std::unique_ptr rpc(new ClientRpc); + std::unique_ptr> stream( + stub_->DoGet(&rpc->context, pb_ticket)); + + *out = std::unique_ptr( + new FlightStreamReader(std::move(rpc), schema, std::move(stream))); + return Status::OK(); + } + + Status DoPut(std::unique_ptr* stream) { + return Status::NotImplemented("DoPut"); + } + + private: + std::unique_ptr stub_; +}; + +FlightClient::FlightClient() { impl_.reset(new FlightClientImpl); } + +FlightClient::~FlightClient() {} + +Status FlightClient::Connect(const std::string& host, int port, + std::unique_ptr* client) { + client->reset(new FlightClient); + return (*client)->impl_->Connect(host, port); +} + +Status FlightClient::DoAction(const Action& action, + std::unique_ptr* results) { + return impl_->DoAction(action, results); +} + +Status FlightClient::ListActions(std::vector* actions) { + return impl_->ListActions(actions); +} + +Status FlightClient::GetFlightInfo(const FlightDescriptor& descriptor, + std::unique_ptr* info) { + return impl_->GetFlightInfo(descriptor, info); +} + +Status FlightClient::ListFlights(std::unique_ptr* listing) { + return ListFlights({}, listing); +} + +Status FlightClient::ListFlights(const Criteria& criteria, + std::unique_ptr* listing) { + return impl_->ListFlights(criteria, listing); +} + +Status FlightClient::DoGet(const Ticket& ticket, const std::shared_ptr& schema, + std::unique_ptr* stream) { + return impl_->DoGet(ticket, schema, stream); +} + +Status FlightClient::DoPut(const Schema& schema, + std::unique_ptr* stream) { + return Status::NotImplemented("DoPut"); +} + +} // namespace flight +} // namespace arrow diff --git a/cpp/src/arrow/flight/client.h b/cpp/src/arrow/flight/client.h new file mode 100644 index 0000000000000..be3d86a0dde77 --- /dev/null +++ b/cpp/src/arrow/flight/client.h @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// \brief Implementation of Flight RPC client using gRPC. API should be +// considered experimental for now + +#pragma once + +#include +#include +#include + +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +#include "arrow/flight/types.h" + +namespace arrow { + +class RecordBatch; +class RecordBatchReader; +class Schema; + +namespace flight { + +/// \brief Client class for Arrow Flight RPC services (gRPC-based). +/// API experimental for now +class ARROW_EXPORT FlightClient { + public: + ~FlightClient(); + + /// \brief Connect to an unauthenticated flight service + /// \param[in] host the hostname or IP address + /// \param[in] port the port on the host + /// \param[out] client the created FlightClient + /// \return Status OK status may not indicate that the connection was + /// successful + static Status Connect(const std::string& host, int port, + std::unique_ptr* client); + + /// \brief Perform the indicated action, returning an iterator to the stream + /// of results, if any + /// \param[in] action the action to be performed + /// \param[out] results an iterator object for reading the returned results + /// \return Status + Status DoAction(const Action& action, std::unique_ptr* results); + + /// \brief Retrieve a list of available Action types + /// \param[out] actions the available actions + /// \return Status + Status ListActions(std::vector* actions); + + /// \brief Request access plan for a single flight, which may be an existing + /// dataset or a command to be executed + /// \param[in] descriptor the dataset request, whether a named dataset or + /// command + /// \param[out] info the FlightInfo describing where to access the dataset + /// \return Status + Status GetFlightInfo(const FlightDescriptor& descriptor, + std::unique_ptr* info); + + /// \brief List all available flights known to the server + /// \param[out] listing an iterator that returns a FlightInfo for each flight + /// \return Status + Status ListFlights(std::unique_ptr* listing); + + /// \brief List available flights given indicated filter criteria + /// \param[in] criteria the filter criteria (opaque) + /// \param[out] listing an iterator that returns a FlightInfo for each flight + /// \return Status + Status ListFlights(const Criteria& criteria, std::unique_ptr* listing); + + /// \brief Given a flight ticket and schema, request to be sent the + /// stream. Returns record batch stream reader + /// \param[in] ticket + /// \param[in] schema the arrow::Schema for the stream as computed by + /// GetFlightInfo + /// \param[out] stream the returned RecordBatchReader + /// \return Status + Status DoGet(const Ticket& ticket, const std::shared_ptr& schema, + std::unique_ptr* stream); + + /// \brief Initiate DoPut RPC, returns FlightPutWriter interface to + /// write. Not yet implemented + /// \param[out] stream the created stream to write record batches to + /// \return Status + Status DoPut(const Schema& schema, std::unique_ptr* stream); + + private: + FlightClient(); + class FlightClientImpl; + std::unique_ptr impl_; +}; + +} // namespace flight +} // namespace arrow diff --git a/cpp/src/arrow/flight/flight-benchmark.cc b/cpp/src/arrow/flight/flight-benchmark.cc new file mode 100644 index 0000000000000..ac50ab033544e --- /dev/null +++ b/cpp/src/arrow/flight/flight-benchmark.cc @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include + +#include + +#include "arrow/api.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/api.h" +#include "arrow/record_batch.h" +#include "arrow/test-util.h" +#include "arrow/util/stopwatch.h" +#include "arrow/util/thread-pool.h" + +#include "arrow/flight/api.h" +#include "arrow/flight/perf.pb.h" +#include "arrow/flight/test-util.h" + +DEFINE_int32(num_servers, 1, "Number of performance servers to run"); +DEFINE_int32(num_streams, 4, "Number of streams for each server"); +DEFINE_int32(num_threads, 4, "Number of concurrent gets"); +DEFINE_int32(records_per_stream, 10000000, "Total records per stream"); +DEFINE_int32(records_per_batch, 4096, "Total records per batch within stream"); + +namespace perf = arrow::flight::perf; + +using ThreadPool = ::arrow::internal::ThreadPool; + +namespace arrow { +namespace flight { + +struct PerformanceStats { + PerformanceStats() : total_records(0), total_bytes(0) {} + std::mutex mutex; + int64_t total_records; + int64_t total_bytes; + + void Update(const int64_t total_records, const int64_t total_bytes) { + std::lock_guard lock(this->mutex); + this->total_records += total_records; + this->total_bytes += total_bytes; + } +}; + +Status RunPerformanceTest(const int port) { + // TODO(wesm): Multiple servers + // std::vector> servers; + + // schema not needed + perf::Perf perf; + perf.set_stream_count(FLAGS_num_streams); + perf.set_records_per_stream(FLAGS_records_per_stream); + perf.set_records_per_batch(FLAGS_records_per_batch); + + // Construct client and plan the query + std::unique_ptr client; + RETURN_NOT_OK(FlightClient::Connect("localhost", port, &client)); + + FlightDescriptor descriptor; + descriptor.type = FlightDescriptor::CMD; + perf.SerializeToString(&descriptor.cmd); + + std::unique_ptr plan; + RETURN_NOT_OK(client->GetFlightInfo(descriptor, &plan)); + + // Read the streams in parallel + std::shared_ptr schema; + RETURN_NOT_OK(plan->GetSchema(&schema)); + + PerformanceStats stats; + auto ConsumeStream = [&stats, &schema, &port](const FlightEndpoint& endpoint) { + // TODO(wesm): Use location from endpoint, same host/port for now + std::unique_ptr client; + RETURN_NOT_OK(FlightClient::Connect("localhost", port, &client)); + + perf::Token token; + token.ParseFromString(endpoint.ticket.ticket); + + std::unique_ptr reader; + RETURN_NOT_OK(client->DoGet(endpoint.ticket, schema, &reader)); + + std::shared_ptr batch; + + // This is hard-coded for right now, 4 columns each with int64 + const int bytes_per_record = 32; + + // This must also be set in perf-server.c + const bool verify = false; + + int64_t num_bytes = 0; + int64_t num_records = 0; + while (true) { + RETURN_NOT_OK(reader->ReadNext(&batch)); + if (!batch) { + break; + } + + if (verify) { + auto values = + reinterpret_cast(batch->column_data(0)->buffers[1]->data()); + const int64_t start = token.start() + num_records; + for (int64_t i = 0; i < batch->num_rows(); ++i) { + if (values[i] != start + i) { + return Status::Invalid("verification failure"); + } + } + } + + num_records += batch->num_rows(); + + // Hard-coded + num_bytes += batch->num_rows() * bytes_per_record; + } + stats.Update(num_records, num_bytes); + return Status::OK(); + }; + + StopWatch timer; + timer.Start(); + + // XXX(wesm): Serial version for debugging + // for (const auto& endpoint : plan->endpoints()) { + // RETURN_NOT_OK(ConsumeStream(endpoint)); + // } + + std::shared_ptr pool; + RETURN_NOT_OK(ThreadPool::Make(FLAGS_num_threads, &pool)); + std::vector> tasks; + for (const auto& endpoint : plan->endpoints()) { + tasks.emplace_back(pool->Submit(ConsumeStream, endpoint)); + } + + // Wait for tasks to finish + for (auto&& task : tasks) { + RETURN_NOT_OK(task.get()); + } + + // Elapsed time in seconds + uint64_t elapsed_nanos = timer.Stop(); + double time_elapsed = elapsed_nanos / static_cast(1000000000LL); + + constexpr double kMegabyte = static_cast(1 << 20); + + // Check that number of rows read is as expected + if (stats.total_records != static_cast(plan->total_records())) { + return Status::Invalid("Did not consume expected number of records"); + } + + std::cout << "Bytes read: " << stats.total_bytes << std::endl; + std::cout << "Nanos: " << elapsed_nanos << std::endl; + std::cout << "Speed: " << (stats.total_bytes / time_elapsed / kMegabyte) << " MB/s" + << std::endl; + return Status::OK(); +} + +} // namespace flight +} // namespace arrow + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + const int port = 31337; + arrow::flight::TestServer server("flight-perf-server", port); + server.Start(); + + arrow::Status s = arrow::flight::RunPerformanceTest(port); + server.Stop(); + + if (!s.ok()) { + std::cerr << "Failed with error: << " << s.ToString() << std::endl; + } + + return 0; +} diff --git a/cpp/src/arrow/flight/flight-test.cc b/cpp/src/arrow/flight/flight-test.cc new file mode 100644 index 0000000000000..2d1b2f8477d9a --- /dev/null +++ b/cpp/src/arrow/flight/flight-test.cc @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef _WIN32 +#include +#include +#include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "arrow/ipc/test-common.h" +#include "arrow/status.h" +#include "arrow/test-util.h" + +#include "arrow/flight/api.h" + +#ifdef GRPCPP_GRPCPP_H +#error "gRPC headers should not be in public API" +#endif + +#include "arrow/flight/Flight.pb.h" +#include "arrow/flight/internal.h" +#include "arrow/flight/test-util.h" + +namespace pb = arrow::flight::protocol; + +namespace arrow { +namespace flight { + +TEST(TestFlight, StartStopTestServer) { + TestServer server("flight-test-server", 92385); + server.Start(); + ASSERT_TRUE(server.IsRunning()); + + sleep_for(0.2); + + ASSERT_TRUE(server.IsRunning()); + int exit_code = server.Stop(); + ASSERT_EQ(0, exit_code); +} + +// ---------------------------------------------------------------------- +// Client tests + +class TestFlightClient : public ::testing::Test { + public: + // Uncomment these when you want to run the server separately for + // debugging/valgrind/gdb + + // void SetUp() { + // port_ = 92358; + // ASSERT_OK(ConnectClient()); + // } + // void TearDown() {} + + void SetUp() { + port_ = 92358; + server_.reset(new TestServer("flight-test-server", port_)); + server_->Start(); + ASSERT_OK(ConnectClient()); + } + + void TearDown() { server_->Stop(); } + + Status ConnectClient() { return FlightClient::Connect("localhost", port_, &client_); } + + protected: + int port_; + std::unique_ptr client_; + std::unique_ptr server_; +}; + +// The server implementation is in test-server.cc; to make changes to the +// expected results, make edits there +void AssertEqual(const FlightDescriptor& expected, const FlightDescriptor& actual) {} + +void AssertEqual(const Ticket& expected, const Ticket& actual) { + ASSERT_EQ(expected.ticket, actual.ticket); +} + +void AssertEqual(const Location& expected, const Location& actual) { + ASSERT_EQ(expected.host, actual.host); + ASSERT_EQ(expected.port, actual.port); +} + +void AssertEqual(const std::vector& expected, + const std::vector& actual) { + ASSERT_EQ(expected.size(), actual.size()); + for (size_t i = 0; i < expected.size(); ++i) { + AssertEqual(expected[i].ticket, actual[i].ticket); + + ASSERT_EQ(expected[i].locations.size(), actual[i].locations.size()); + for (size_t j = 0; j < expected[i].locations.size(); ++j) { + AssertEqual(expected[i].locations[j], actual[i].locations[j]); + } + } +} + +void AssertEqual(const FlightInfo& expected, const FlightInfo& actual) { + std::shared_ptr ex_schema, actual_schema; + ASSERT_OK(expected.GetSchema(&ex_schema)); + ASSERT_OK(actual.GetSchema(&actual_schema)); + + AssertSchemaEqual(*ex_schema, *actual_schema); + ASSERT_EQ(expected.total_records(), actual.total_records()); + ASSERT_EQ(expected.total_bytes(), actual.total_bytes()); + + AssertEqual(expected.descriptor(), actual.descriptor()); + AssertEqual(expected.endpoints(), actual.endpoints()); +} + +void AssertEqual(const ActionType& expected, const ActionType& actual) { + ASSERT_EQ(expected.type, actual.type); + ASSERT_EQ(expected.description, actual.description); +} + +template +void AssertEqual(const std::vector& expected, const std::vector& actual) { + ASSERT_EQ(expected.size(), actual.size()); + for (size_t i = 0; i < expected.size(); ++i) { + AssertEqual(expected[i], actual[i]); + } +} + +TEST_F(TestFlightClient, ListFlights) { + std::unique_ptr listing; + ASSERT_OK(client_->ListFlights(&listing)); + ASSERT_TRUE(listing != nullptr); + + std::vector flights = ExampleFlightInfo(); + std::unique_ptr info; + + for (const FlightInfo& flight : flights) { + ASSERT_OK(listing->Next(&info)); + AssertEqual(flight, *info); + } + ASSERT_OK(listing->Next(&info)); + ASSERT_TRUE(info == nullptr); + + ASSERT_OK(listing->Next(&info)); +} + +TEST_F(TestFlightClient, GetFlightInfo) { + FlightDescriptor descr{FlightDescriptor::PATH, "", {"foo", "bar"}}; + std::unique_ptr info; + ASSERT_OK(client_->GetFlightInfo(descr, &info)); + + ASSERT_TRUE(info != nullptr); + + std::vector flights = ExampleFlightInfo(); + AssertEqual(flights[0], *info); +} + +TEST(TestFlightProtocol, FlightDescriptor) { + FlightDescriptor descr_test; + pb::FlightDescriptor pb_descr; + + FlightDescriptor descr1{FlightDescriptor::PATH, "", {"foo", "bar"}}; + ASSERT_OK(internal::ToProto(descr1, &pb_descr)); + ASSERT_OK(internal::FromProto(pb_descr, &descr_test)); + AssertEqual(descr1, descr_test); + + FlightDescriptor descr2{FlightDescriptor::CMD, "command", {}}; + ASSERT_OK(internal::ToProto(descr2, &pb_descr)); + ASSERT_OK(internal::FromProto(pb_descr, &descr_test)); + AssertEqual(descr2, descr_test); +} + +TEST_F(TestFlightClient, DoGet) { + FlightDescriptor descr{FlightDescriptor::PATH, "", {"foo", "bar"}}; + std::unique_ptr info; + ASSERT_OK(client_->GetFlightInfo(descr, &info)); + + // Two endpoints in the example FlightInfo + ASSERT_EQ(2, info->endpoints().size()); + + Ticket ticket = info->endpoints()[0].ticket; + AssertEqual(Ticket{"ticket-id-1"}, ticket); + + std::shared_ptr schema; + ASSERT_OK(info->GetSchema(&schema)); + + auto expected_schema = ExampleSchema1(); + AssertSchemaEqual(*expected_schema, *schema); + + std::unique_ptr stream; + ASSERT_OK(client_->DoGet(ticket, schema, &stream)); + + BatchVector expected_batches; + const int num_batches = 5; + ASSERT_OK(SimpleIntegerBatches(num_batches, &expected_batches)); + std::shared_ptr chunk; + for (int i = 0; i < num_batches; ++i) { + ASSERT_OK(stream->ReadNext(&chunk)); + ASSERT_BATCHES_EQUAL(*expected_batches[i], *chunk); + } + + // Stream exhausted + ASSERT_OK(stream->ReadNext(&chunk)); + ASSERT_EQ(nullptr, chunk); +} + +TEST_F(TestFlightClient, ListActions) { + std::vector actions; + ASSERT_OK(client_->ListActions(&actions)); + + std::vector expected = ExampleActionTypes(); + AssertEqual(expected, actions); +} + +TEST_F(TestFlightClient, DoAction) { + Action action; + std::unique_ptr stream; + std::unique_ptr result; + + // Run action1 + action.type = "action1"; + + const std::string action1_value = "action1-content"; + ASSERT_OK(Buffer::FromString(action1_value, &action.body)); + ASSERT_OK(client_->DoAction(action, &stream)); + + for (int i = 0; i < 3; ++i) { + ASSERT_OK(stream->Next(&result)); + std::string expected = action1_value + "-part" + std::to_string(i); + ASSERT_EQ(expected, result->body->ToString()); + } + + // stream consumed + ASSERT_OK(stream->Next(&result)); + ASSERT_EQ(nullptr, result); + + // Run action2, no results + action.type = "action2"; + ASSERT_OK(client_->DoAction(action, &stream)); + + ASSERT_OK(stream->Next(&result)); + ASSERT_EQ(nullptr, result); +} + +} // namespace flight +} // namespace arrow diff --git a/cpp/src/arrow/flight/internal.cc b/cpp/src/arrow/flight/internal.cc new file mode 100644 index 0000000000000..796e6095cdb7f --- /dev/null +++ b/cpp/src/arrow/flight/internal.cc @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/flight/internal.h" + +#include +#include +#include + +#include + +#include "arrow/io/memory.h" +#include "arrow/ipc/reader.h" +#include "arrow/ipc/writer.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace flight { +namespace internal { + +Status FromGrpcStatus(const grpc::Status& grpc_status) { + if (grpc_status.ok()) { + return Status::OK(); + } + std::stringstream ss; + + if (grpc_status.error_code() == grpc::StatusCode::UNIMPLEMENTED) { + ss << "gRPC returned unimplemented error, with message: " + << grpc_status.error_message(); + return Status::NotImplemented(ss.str()); + } else { + ss << "gRPC failed with error code " << grpc_status.error_code() + << " and message: " << grpc_status.error_message(); + return Status::IOError(ss.str()); + } +} + +grpc::Status ToGrpcStatus(const Status& arrow_status) { + if (arrow_status.ok()) { + return grpc::Status::OK; + } else { + grpc::StatusCode grpc_code = grpc::StatusCode::UNKNOWN; + if (arrow_status.IsNotImplemented()) { + grpc_code = grpc::StatusCode::UNIMPLEMENTED; + } else if (arrow_status.IsInvalid()) { + grpc_code = grpc::StatusCode::INVALID_ARGUMENT; + } + return grpc::Status(grpc_code, arrow_status.message()); + } +} + +// ActionType + +Status FromProto(const pb::ActionType& pb_type, ActionType* type) { + type->type = pb_type.type(); + type->description = pb_type.description(); + return Status::OK(); +} + +Status ToProto(const ActionType& type, pb::ActionType* pb_type) { + pb_type->set_type(type.type); + pb_type->set_description(type.description); + return Status::OK(); +} + +// Action + +Status FromProto(const pb::Action& pb_action, Action* action) { + action->type = pb_action.type(); + return Buffer::FromString(pb_action.body(), &action->body); +} + +Status ToProto(const Action& action, pb::Action* pb_action) { + pb_action->set_type(action.type); + pb_action->set_body(action.body->ToString()); + return Status::OK(); +} + +// Result (of an Action) + +Status FromProto(const pb::Result& pb_result, Result* result) { + // ARROW-3250; can avoid copy. Can also write custom deserializer if it + // becomes an issue + return Buffer::FromString(pb_result.body(), &result->body); +} + +Status ToProto(const Result& result, pb::Result* pb_result) { + pb_result->set_body(result.body->ToString()); + return Status::OK(); +} + +// Criteria + +Status FromProto(const pb::Criteria& pb_criteria, Criteria* criteria) { + return Status::OK(); +} + +// Location + +Status FromProto(const pb::Location& pb_location, Location* location) { + location->host = pb_location.host(); + location->port = pb_location.port(); + return Status::OK(); +} + +void ToProto(const Location& location, pb::Location* pb_location) { + pb_location->set_host(location.host); + pb_location->set_port(location.port); +} + +// Ticket + +Status FromProto(const pb::Ticket& pb_ticket, Ticket* ticket) { + ticket->ticket = pb_ticket.ticket(); + return Status::OK(); +} + +void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket) { + pb_ticket->set_ticket(ticket.ticket); +} + +// FlightEndpoint + +Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint) { + RETURN_NOT_OK(FromProto(pb_endpoint.ticket(), &endpoint->ticket)); + endpoint->locations.resize(pb_endpoint.location_size()); + for (int i = 0; i < pb_endpoint.location_size(); ++i) { + RETURN_NOT_OK(FromProto(pb_endpoint.location(i), &endpoint->locations[i])); + } + return Status::OK(); +} + +void ToProto(const FlightEndpoint& endpoint, pb::FlightEndpoint* pb_endpoint) { + ToProto(endpoint.ticket, pb_endpoint->mutable_ticket()); + pb_endpoint->clear_location(); + for (const Location& location : endpoint.locations) { + ToProto(location, pb_endpoint->add_location()); + } +} + +// FlightDescriptor + +Status FromProto(const pb::FlightDescriptor& pb_descriptor, + FlightDescriptor* descriptor) { + if (pb_descriptor.type() == pb::FlightDescriptor::PATH) { + descriptor->type = FlightDescriptor::PATH; + descriptor->path.resize(pb_descriptor.path_size()); + for (int i = 0; i < pb_descriptor.path_size(); ++i) { + descriptor->path.emplace_back(pb_descriptor.path(i)); + } + } else if (pb_descriptor.type() == pb::FlightDescriptor::CMD) { + descriptor->type = FlightDescriptor::CMD; + descriptor->cmd = pb_descriptor.cmd(); + } else { + return Status::Invalid("Client sent UNKNOWN descriptor type"); + } + return Status::OK(); +} + +Status ToProto(const FlightDescriptor& descriptor, pb::FlightDescriptor* pb_descriptor) { + if (descriptor.type == FlightDescriptor::PATH) { + pb_descriptor->set_type(pb::FlightDescriptor::PATH); + for (const std::string& path : descriptor.path) { + pb_descriptor->add_path(path); + } + } else { + pb_descriptor->set_type(pb::FlightDescriptor::CMD); + pb_descriptor->set_cmd(descriptor.cmd); + } + return Status::OK(); +} + +// FlightGetInfo + +Status FromProto(const pb::FlightGetInfo& pb_info, FlightInfo::Data* info) { + RETURN_NOT_OK(FromProto(pb_info.flight_descriptor(), &info->descriptor)); + + info->schema = pb_info.schema(); + + info->endpoints.resize(pb_info.endpoint_size()); + for (int i = 0; i < pb_info.endpoint_size(); ++i) { + RETURN_NOT_OK(FromProto(pb_info.endpoint(i), &info->endpoints[i])); + } + + info->total_records = pb_info.total_records(); + info->total_bytes = pb_info.total_bytes(); + return Status::OK(); +} + +Status SchemaToString(const Schema& schema, std::string* out) { + // TODO(wesm): Do we care about better memory efficiency here? + std::shared_ptr serialized_schema; + RETURN_NOT_OK(ipc::SerializeSchema(schema, default_memory_pool(), &serialized_schema)); + *out = std::string(reinterpret_cast(serialized_schema->data()), + static_cast(serialized_schema->size())); + return Status::OK(); +} + +Status ToProto(const FlightInfo& info, pb::FlightGetInfo* pb_info) { + // clear any repeated fields + pb_info->clear_endpoint(); + + pb_info->set_schema(info.serialized_schema()); + + // descriptor + RETURN_NOT_OK(ToProto(info.descriptor(), pb_info->mutable_flight_descriptor())); + + // endpoints + for (const FlightEndpoint& endpoint : info.endpoints()) { + ToProto(endpoint, pb_info->add_endpoint()); + } + + pb_info->set_total_records(info.total_records()); + pb_info->set_total_bytes(info.total_bytes()); + return Status::OK(); +} + +} // namespace internal +} // namespace flight +} // namespace arrow diff --git a/cpp/src/arrow/flight/internal.h b/cpp/src/arrow/flight/internal.h new file mode 100644 index 0000000000000..bae1eedfa9c66 --- /dev/null +++ b/cpp/src/arrow/flight/internal.h @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include + +#include "arrow/buffer.h" +#include "arrow/ipc/writer.h" +#include "arrow/util/macros.h" + +#include "arrow/flight/Flight.grpc.pb.h" +#include "arrow/flight/Flight.pb.h" +#include "arrow/flight/types.h" + +namespace arrow { + +class Schema; +class Status; + +namespace pb = arrow::flight::protocol; + +namespace flight { + +#define GRPC_RETURN_NOT_OK(s) \ + do { \ + ::arrow::Status _s = (s); \ + if (ARROW_PREDICT_FALSE(!_s.ok())) { \ + return ::arrow::flight::internal::ToGrpcStatus(_s); \ + } \ + } while (0) + +namespace internal { + +Status SchemaToString(const Schema& schema, std::string* out); + +Status FromProto(const pb::ActionType& pb_type, ActionType* type); +Status FromProto(const pb::Action& pb_action, Action* action); +Status FromProto(const pb::Result& pb_result, Result* result); +Status FromProto(const pb::Criteria& pb_criteria, Criteria* criteria); +Status FromProto(const pb::Location& pb_location, Location* location); +Status FromProto(const pb::Ticket& pb_ticket, Ticket* ticket); +Status FromProto(const pb::FlightDescriptor& pb_descr, FlightDescriptor* descr); +Status FromProto(const pb::FlightEndpoint& pb_endpoint, FlightEndpoint* endpoint); +Status FromProto(const pb::FlightGetInfo& pb_info, FlightInfo::Data* info); + +Status ToProto(const FlightDescriptor& descr, pb::FlightDescriptor* pb_descr); +Status ToProto(const FlightInfo& info, pb::FlightGetInfo* pb_info); +Status ToProto(const ActionType& type, pb::ActionType* pb_type); +Status ToProto(const Action& action, pb::Action* pb_action); +Status ToProto(const Result& result, pb::Result* pb_result); +void ToProto(const Ticket& ticket, pb::Ticket* pb_ticket); + +Status FromGrpcStatus(const grpc::Status& grpc_status); + +grpc::Status ToGrpcStatus(const Status& arrow_status); + +} // namespace internal +} // namespace flight +} // namespace arrow diff --git a/cpp/src/arrow/flight/perf-server.cc b/cpp/src/arrow/flight/perf-server.cc new file mode 100644 index 0000000000000..ce2ec7bca6cff --- /dev/null +++ b/cpp/src/arrow/flight/perf-server.cc @@ -0,0 +1,200 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Performance server for benchmarking purposes + +#include +#include +#include +#include +#include + +#include + +#include "arrow/array.h" +#include "arrow/io/test-common.h" +#include "arrow/ipc/writer.h" +#include "arrow/record_batch.h" + +#include "arrow/flight/perf.pb.h" +#include "arrow/flight/server.h" +#include "arrow/flight/test-util.h" + +DEFINE_int32(port, 31337, "Server port to listen on"); + +namespace perf = arrow::flight::perf; +namespace proto = arrow::flight::protocol; + +using IpcPayload = arrow::ipc::internal::IpcPayload; + +namespace arrow { +namespace flight { + +#define CHECK_PARSE(EXPR) \ + do { \ + if (!EXPR) { \ + return Status::Invalid("cannot parse protobuf"); \ + } \ + } while (0) + +using ArrayVector = std::vector>; + +// Create record batches with a unique "a" column so we can verify on the +// client side that the results are correct +class PerfDataStream : public FlightDataStream { + public: + PerfDataStream(bool verify, const int64_t start, const int64_t total_records, + const std::shared_ptr& schema, const ArrayVector& arrays) + : start_(start), + verify_(verify), + batch_length_(arrays[0]->length()), + total_records_(total_records), + records_sent_(0), + schema_(schema), + arrays_(arrays) { + batch_ = RecordBatch::Make(schema, batch_length_, arrays_); + } + + Status Next(IpcPayload* payload) override { + if (records_sent_ >= total_records_) { + // Signal that iteration is over + payload->metadata = nullptr; + return Status::OK(); + } + + if (verify_) { + // mutate first array + auto data = + reinterpret_cast(arrays_[0]->data()->buffers[1]->mutable_data()); + for (int64_t i = 0; i < batch_length_; ++i) { + data[i] = start_ + records_sent_ + i; + } + } + + auto batch = batch_; + + // Last partial batch + if (records_sent_ + batch_length_ > total_records_) { + batch = batch_->Slice(0, total_records_ - records_sent_); + records_sent_ += total_records_ - records_sent_; + } else { + records_sent_ += batch_length_; + } + return ipc::internal::GetRecordBatchPayload(*batch, default_memory_pool(), payload); + } + + private: + const int64_t start_; + bool verify_; + const int64_t batch_length_; + const int64_t total_records_; + int64_t records_sent_; + std::shared_ptr schema_; + std::shared_ptr batch_; + ArrayVector arrays_; +}; + +Status GetPerfBatches(const perf::Token& token, const std::shared_ptr& schema, + bool use_verifier, std::unique_ptr* data_stream) { + std::shared_ptr buffer; + std::vector> arrays; + + const int32_t length = token.definition().records_per_batch(); + const int32_t ncolumns = 4; + for (int i = 0; i < ncolumns; ++i) { + RETURN_NOT_OK(MakeRandomBuffer(length, default_memory_pool(), &buffer)); + arrays.push_back(std::make_shared(length, buffer)); + } + + *data_stream = std::unique_ptr( + new PerfDataStream(use_verifier, token.start(), + token.definition().records_per_stream(), schema, arrays)); + return Status::OK(); +} + +class FlightPerfServer : public FlightServerBase { + public: + FlightPerfServer() : location_(Location{"localhost", FLAGS_port}) { + perf_schema_ = schema({field("a", int64()), field("b", int64()), field("c", int64()), + field("d", int64())}); + } + + Status GetFlightInfo(const FlightDescriptor& request, + std::unique_ptr* info) override { + perf::Perf perf_request; + CHECK_PARSE(perf_request.ParseFromString(request.cmd)); + + perf::Token token; + token.mutable_definition()->CopyFrom(perf_request); + + std::vector endpoints; + Ticket tmp_ticket; + for (int64_t i = 0; i < perf_request.stream_count(); ++i) { + token.set_start(i * perf_request.records_per_stream()); + token.set_end((i + 1) * perf_request.records_per_stream()); + + (void)token.SerializeToString(&tmp_ticket.ticket); + + // All endpoints same location for now + endpoints.push_back(FlightEndpoint{tmp_ticket, {location_}}); + } + + uint64_t total_records = + perf_request.stream_count() * perf_request.records_per_stream(); + + FlightInfo::Data data; + RETURN_NOT_OK( + MakeFlightInfo(*perf_schema_, request, endpoints, total_records, -1, &data)); + *info = std::unique_ptr(new FlightInfo(data)); + return Status::OK(); + } + + Status DoGet(const Ticket& request, + std::unique_ptr* data_stream) override { + perf::Token token; + CHECK_PARSE(token.ParseFromString(request.ticket)); + return GetPerfBatches(token, perf_schema_, false, data_stream); + } + + private: + Location location_; + std::shared_ptr perf_schema_; +}; + +} // namespace flight +} // namespace arrow + +std::unique_ptr g_server; + +void Shutdown(int signal) { + if (g_server != nullptr) { + g_server->Shutdown(); + } +} + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + // SIGTERM shuts down the server + signal(SIGTERM, Shutdown); + + g_server.reset(new arrow::flight::FlightPerfServer); + + // TODO(wesm): How can we tell if the server failed to start for some reason? + g_server->Run(FLAGS_port); + return 0; +} diff --git a/cpp/src/arrow/flight/perf.proto b/cpp/src/arrow/flight/perf.proto new file mode 100644 index 0000000000000..9123bafba0fe4 --- /dev/null +++ b/cpp/src/arrow/flight/perf.proto @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package arrow.flight.perf; + +message Perf { + bytes schema = 1; + int32 stream_count = 2; + int64 records_per_stream = 3; + int32 records_per_batch = 4; +} + +/* + * Payload of ticket + */ +message Token { + + // definition of entire flight. + Perf definition = 1; + + // inclusive start + int64 start = 2; + + // exclusive end + int64 end = 3; + +} \ No newline at end of file diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc new file mode 100644 index 0000000000000..967a254121d9d --- /dev/null +++ b/cpp/src/arrow/flight/server.cc @@ -0,0 +1,385 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/flight/server.h" + +#include +#include +#include +#include + +#include "google/protobuf/io/coded_stream.h" +#include "google/protobuf/io/zero_copy_stream.h" +#include "google/protobuf/wire_format_lite.h" +#include "grpcpp/grpcpp.h" + +#include "arrow/ipc/writer.h" +#include "arrow/record_batch.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" + +#include "arrow/flight/Flight.grpc.pb.h" +#include "arrow/flight/Flight.pb.h" +#include "arrow/flight/internal.h" +#include "arrow/flight/types.h" + +using FlightService = arrow::flight::protocol::FlightService; +using ServerContext = grpc::ServerContext; + +using arrow::ipc::internal::IpcPayload; + +template +using ServerWriter = grpc::ServerWriter; + +namespace pb = arrow::flight::protocol; + +constexpr int64_t kInt32Max = std::numeric_limits::max(); + +namespace grpc { + +using google::protobuf::internal::WireFormatLite; +using google::protobuf::io::CodedOutputStream; + +// More efficient writing of FlightData to gRPC output buffer +// Implementation of ZeroCopyOutputStream that writes to a fixed-size buffer +class FixedSizeProtoWriter : public ::google::protobuf::io::ZeroCopyOutputStream { + public: + explicit FixedSizeProtoWriter(grpc_slice slice) + : slice_(slice), + bytes_written_(0), + total_size_(static_cast(GRPC_SLICE_LENGTH(slice))) {} + + bool Next(void** data, int* size) override { + // Consume the whole slice + *data = GRPC_SLICE_START_PTR(slice_) + bytes_written_; + *size = total_size_ - bytes_written_; + bytes_written_ = total_size_; + return true; + } + + void BackUp(int count) override { bytes_written_ -= count; } + + int64_t ByteCount() const override { return bytes_written_; } + + private: + grpc_slice slice_; + int bytes_written_; + int total_size_; +}; + +// Write FlightData to a grpc::ByteBuffer without extra copying +template <> +class SerializationTraits { + public: + static grpc::Status Deserialize(ByteBuffer* buffer, IpcPayload* out) { + return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, + "IpcPayload deserialization not implemented"); + } + + static grpc::Status Serialize(const IpcPayload& msg, ByteBuffer* out, + bool* own_buffer) { + size_t total_size = 0; + + DCHECK_LT(msg.metadata->size(), kInt32Max); + const int32_t metadata_size = static_cast(msg.metadata->size()); + + // 1 byte for metadata tag + total_size += 1 + WireFormatLite::LengthDelimitedSize(metadata_size); + + int64_t body_size = 0; + for (const auto& buffer : msg.body_buffers) { + body_size += buffer->size(); + + const int64_t remainder = buffer->size() % 8; + if (remainder) { + body_size += 8 - remainder; + } + } + + // 2 bytes for body tag + total_size += 2 + WireFormatLite::LengthDelimitedSize(static_cast(body_size)); + + // TODO(wesm): messages over 2GB unlikely to be yet supported + if (total_size > kInt32Max) { + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, + "Cannot send record batches exceeding 2GB yet"); + } + + // Allocate slice, assign to output buffer + grpc::Slice slice(total_size); + + // XXX(wesm): for debugging + // std::cout << "Writing record batch with total size " << total_size << std::endl; + + FixedSizeProtoWriter writer(*reinterpret_cast(&slice)); + CodedOutputStream pb_stream(&writer); + + // Write header + WireFormatLite::WriteTag(pb::FlightData::kDataHeaderFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &pb_stream); + pb_stream.WriteVarint32(metadata_size); + pb_stream.WriteRawMaybeAliased(msg.metadata->data(), + static_cast(msg.metadata->size())); + + // Write body + WireFormatLite::WriteTag(pb::FlightData::kDataBodyFieldNumber, + WireFormatLite::WIRETYPE_LENGTH_DELIMITED, &pb_stream); + pb_stream.WriteVarint32(static_cast(body_size)); + + constexpr uint8_t kPaddingBytes[8] = {0}; + + for (const auto& buffer : msg.body_buffers) { + pb_stream.WriteRawMaybeAliased(buffer->data(), static_cast(buffer->size())); + + // Write padding if not multiple of 8 + const int remainder = buffer->size() % 8; + if (remainder) { + pb_stream.WriteRawMaybeAliased(kPaddingBytes, 8 - remainder); + } + } + + DCHECK_EQ(static_cast(total_size), pb_stream.ByteCount()); + + // Hand off the slice to the returned ByteBuffer + grpc::ByteBuffer tmp(&slice, 1); + out->Swap(&tmp); + *own_buffer = true; + return grpc::Status::OK; + } +}; + +} // namespace grpc + +namespace arrow { +namespace flight { + +#define CHECK_ARG_NOT_NULL(VAL, MESSAGE) \ + if (VAL == nullptr) { \ + return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, MESSAGE); \ + } + +// This class glues an implementation of FlightServerBase together with the +// gRPC service definition, so the latter is not exposed in the public API +class FlightServiceImpl : public FlightService::Service { + public: + explicit FlightServiceImpl(FlightServerBase* server) : server_(server) {} + + template + grpc::Status WriteStream(Iterator* iterator, ServerWriter* writer) { + // Write flight info to stream until listing is exhausted + ProtoType pb_value; + std::unique_ptr value; + while (true) { + GRPC_RETURN_NOT_OK(iterator->Next(&value)); + if (!value) { + break; + } + GRPC_RETURN_NOT_OK(internal::ToProto(*value, &pb_value)); + + // Blocking write + if (!writer->Write(pb_value)) { + // Write returns false if the stream is closed + break; + } + } + return grpc::Status::OK; + } + + template + grpc::Status WriteStream(const std::vector& values, + ServerWriter* writer) { + // Write flight info to stream until listing is exhausted + ProtoType pb_value; + for (const UserType& value : values) { + GRPC_RETURN_NOT_OK(internal::ToProto(value, &pb_value)); + // Blocking write + if (!writer->Write(pb_value)) { + // Write returns false if the stream is closed + break; + } + } + return grpc::Status::OK; + } + + grpc::Status ListFlights(ServerContext* context, const pb::Criteria* request, + ServerWriter* writer) { + // Retrieve the listing from the implementation + std::unique_ptr listing; + + Criteria criteria; + if (request) { + GRPC_RETURN_NOT_OK(internal::FromProto(*request, &criteria)); + } + GRPC_RETURN_NOT_OK(server_->ListFlights(&criteria, &listing)); + return WriteStream(listing.get(), writer); + } + + grpc::Status GetFlightInfo(ServerContext* context, const pb::FlightDescriptor* request, + pb::FlightGetInfo* response) { + CHECK_ARG_NOT_NULL(request, "FlightDescriptor cannot be null"); + + FlightDescriptor descr; + GRPC_RETURN_NOT_OK(internal::FromProto(*request, &descr)); + + std::unique_ptr info; + GRPC_RETURN_NOT_OK(server_->GetFlightInfo(descr, &info)); + + GRPC_RETURN_NOT_OK(internal::ToProto(*info, response)); + return grpc::Status::OK; + } + + grpc::Status DoGet(ServerContext* context, const pb::Ticket* request, + ServerWriter* writer) { + CHECK_ARG_NOT_NULL(request, "ticket cannot be null"); + + Ticket ticket; + GRPC_RETURN_NOT_OK(internal::FromProto(*request, &ticket)); + + std::unique_ptr data_stream; + GRPC_RETURN_NOT_OK(server_->DoGet(ticket, &data_stream)); + + // Requires ServerWriter customization in grpc_customizations.h + auto custom_writer = reinterpret_cast*>(writer); + + while (true) { + IpcPayload payload; + GRPC_RETURN_NOT_OK(data_stream->Next(&payload)); + if (payload.metadata == nullptr || + !custom_writer->Write(payload, grpc::WriteOptions())) { + // No more messages to write, or connection terminated for some other + // reason + break; + } + } + return grpc::Status::OK; + } + + grpc::Status DoPut(ServerContext* context, grpc::ServerReader* reader, + pb::PutResult* response) { + return grpc::Status(grpc::StatusCode::UNIMPLEMENTED, ""); + } + + grpc::Status ListActions(ServerContext* context, const pb::Empty* request, + ServerWriter* writer) { + // Retrieve the listing from the implementation + std::vector types; + GRPC_RETURN_NOT_OK(server_->ListActions(&types)); + return WriteStream(types, writer); + } + + grpc::Status DoAction(ServerContext* context, const pb::Action* request, + ServerWriter* writer) { + CHECK_ARG_NOT_NULL(request, "Action cannot be null"); + Action action; + GRPC_RETURN_NOT_OK(internal::FromProto(*request, &action)); + + std::unique_ptr results; + GRPC_RETURN_NOT_OK(server_->DoAction(action, &results)); + + std::unique_ptr result; + pb::Result pb_result; + while (true) { + GRPC_RETURN_NOT_OK(results->Next(&result)); + if (!result) { + // No more results + break; + } + GRPC_RETURN_NOT_OK(internal::ToProto(*result, &pb_result)); + if (!writer->Write(pb_result)) { + // Stream may be closed + break; + } + } + return grpc::Status::OK; + } + + private: + FlightServerBase* server_; +}; + +struct FlightServerBase::FlightServerBaseImpl { + std::unique_ptr server; +}; + +FlightServerBase::FlightServerBase() { impl_.reset(new FlightServerBaseImpl); } + +FlightServerBase::~FlightServerBase() {} + +void FlightServerBase::Run(int port) { + std::string address = "localhost:" + std::to_string(port); + + FlightServiceImpl service(this); + grpc::ServerBuilder builder; + builder.AddListeningPort(address, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); + + impl_->server = builder.BuildAndStart(); + std::cout << "Server listening on " << address << std::endl; + impl_->server->Wait(); +} + +void FlightServerBase::Shutdown() { + DCHECK(impl_->server); + impl_->server->Shutdown(); +} + +Status FlightServerBase::ListFlights(const Criteria* criteria, + std::unique_ptr* listings) { + return Status::NotImplemented("NYI"); +} + +Status FlightServerBase::GetFlightInfo(const FlightDescriptor& request, + std::unique_ptr* info) { + std::cout << "GetFlightInfo" << std::endl; + return Status::NotImplemented("NYI"); +} + +Status FlightServerBase::DoGet(const Ticket& request, + std::unique_ptr* data_stream) { + return Status::NotImplemented("NYI"); +} + +Status FlightServerBase::DoAction(const Action& action, + std::unique_ptr* result) { + return Status::NotImplemented("NYI"); +} + +Status FlightServerBase::ListActions(std::vector* actions) { + return Status::NotImplemented("NYI"); +} + +// ---------------------------------------------------------------------- +// Implement RecordBatchStream + +RecordBatchStream::RecordBatchStream(const std::shared_ptr& reader) + : pool_(default_memory_pool()), reader_(reader) {} + +Status RecordBatchStream::Next(IpcPayload* payload) { + std::shared_ptr batch; + RETURN_NOT_OK(reader_->ReadNext(&batch)); + + if (!batch) { + // Signal that iteration is over + payload->metadata = nullptr; + return Status::OK(); + } else { + return ipc::internal::GetRecordBatchPayload(*batch, pool_, payload); + } +} + +} // namespace flight +} // namespace arrow diff --git a/cpp/src/arrow/flight/server.h b/cpp/src/arrow/flight/server.h new file mode 100644 index 0000000000000..89154ac8623e0 --- /dev/null +++ b/cpp/src/arrow/flight/server.h @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Interfaces to use for defining Flight RPC servers. API should be considered +// experimental for now + +#pragma once + +#include +#include +#include +#include + +#include "arrow/util/visibility.h" + +#include "arrow/flight/types.h" + +namespace arrow { + +class MemoryPool; +class RecordBatchReader; +class Status; + +namespace ipc { +namespace internal { + +struct IpcPayload; + +} // namespace internal +} // namespace ipc + +namespace io { + +class OutputStream; + +} // namespace io + +namespace flight { + +/// \brief Interface that produces a sequence of IPC payloads to be sent in +/// FlightData protobuf messages +class ARROW_EXPORT FlightDataStream { + public: + virtual ~FlightDataStream() = default; + + // When the stream is completed, the last payload written will have null + // metadata + virtual Status Next(ipc::internal::IpcPayload* payload) = 0; +}; + +/// \brief A basic implementation of FlightDataStream that will provide +/// a sequence of FlightData messages to be written to a gRPC stream +/// \param[in] reader produces a sequence of record batches +class ARROW_EXPORT RecordBatchStream : public FlightDataStream { + public: + explicit RecordBatchStream(const std::shared_ptr& reader); + + Status Next(ipc::internal::IpcPayload* payload) override; + + private: + MemoryPool* pool_; + std::shared_ptr reader_; +}; + +/// \brief Skeleton RPC server implementation which can be used to create +/// custom servers by implementing its abstract methods +class ARROW_EXPORT FlightServerBase { + public: + FlightServerBase(); + virtual ~FlightServerBase(); + + /// \brief Run an insecure server on localhost at the indicated port. Block + /// until server is shut down or otherwise terminates + /// \param[in] port + /// \return Status + void Run(int port); + + /// \brief Shut down the server. Can be called from signal handler or another + /// thread while Run blocks + /// + /// TODO(wesm): Shutdown with deadline + void Shutdown(); + + // Implement these methods to create your own server. The default + // implementations will return a not-implemented result to the client + + /// \brief Retrieve a list of available fields given an optional opaque + /// criteria + /// \param[in] criteria may be null + /// \param[out] listings the returned listings iterator + /// \return Status + virtual Status ListFlights(const Criteria* criteria, + std::unique_ptr* listings); + + /// \brief Retrieve the schema and an access plan for the indicated + /// descriptor + /// \param[in] request may be null + /// \param[out] info the returned flight info provider + /// \return Status + virtual Status GetFlightInfo(const FlightDescriptor& request, + std::unique_ptr* info); + + /// \brief Get a stream of IPC payloads to put on the wire + /// \param[in] ticket an opaque ticket + /// \param[out] stream the returned stream provider + /// \return Status + virtual Status DoGet(const Ticket& request, std::unique_ptr* stream); + + // virtual Status DoPut(std::unique_ptr* reader) = 0; + + /// \brief Execute an action, return stream of zero or more results + /// \param[in] action the action to execute, with type and body + /// \param[out] result the result iterator + /// \return Status + virtual Status DoAction(const Action& action, std::unique_ptr* result); + + /// \brief Retrieve the list of available actions + /// \param[out] actions a vector of available action types + /// \return Status + virtual Status ListActions(std::vector* actions); + + private: + struct FlightServerBaseImpl; + std::unique_ptr impl_; +}; + +} // namespace flight +} // namespace arrow diff --git a/cpp/src/arrow/flight/test-server.cc b/cpp/src/arrow/flight/test-server.cc new file mode 100644 index 0000000000000..14b03d9adeb8c --- /dev/null +++ b/cpp/src/arrow/flight/test-server.cc @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Example server implementation to use for unit testing and benchmarking +// purposes + +#include +#include +#include +#include + +#include + +#include "arrow/io/test-common.h" +#include "arrow/record_batch.h" + +#include "arrow/flight/server.h" +#include "arrow/flight/test-util.h" + +DEFINE_int32(port, 31337, "Server port to listen on"); + +namespace arrow { +namespace flight { + +Status GetBatchForFlight(const Ticket& ticket, std::shared_ptr* out) { + if (ticket.ticket == "ticket-id-1") { + BatchVector batches; + RETURN_NOT_OK(SimpleIntegerBatches(5, &batches)); + *out = std::make_shared(batches[0]->schema(), batches); + return Status::OK(); + } else { + return Status::NotImplemented("no stream implemented for this ticket"); + } +} + +class FlightTestServer : public FlightServerBase { + Status ListFlights(const Criteria* criteria, + std::unique_ptr* listings) override { + std::vector flights = ExampleFlightInfo(); + *listings = std::unique_ptr(new SimpleFlightListing(flights)); + return Status::OK(); + } + + Status GetFlightInfo(const FlightDescriptor& request, + std::unique_ptr* info) override { + std::vector flights = ExampleFlightInfo(); + + const FlightInfo* value; + + // We only have one kind of flight for each descriptor type + if (request.type == FlightDescriptor::PATH) { + value = &flights[0]; + } else { + value = &flights[1]; + } + + *info = std::unique_ptr(new FlightInfo(*value)); + return Status::OK(); + } + + Status DoGet(const Ticket& request, + std::unique_ptr* data_stream) override { + std::shared_ptr batch_reader; + RETURN_NOT_OK(GetBatchForFlight(request, &batch_reader)); + + *data_stream = std::unique_ptr(new RecordBatchStream(batch_reader)); + return Status::OK(); + } + + Status RunAction1(const Action& action, std::unique_ptr* out) { + std::vector results; + for (int i = 0; i < 3; ++i) { + Result result; + std::string value = action.body->ToString() + "-part" + std::to_string(i); + RETURN_NOT_OK(Buffer::FromString(value, &result.body)); + results.push_back(result); + } + *out = std::unique_ptr(new SimpleResultStream(std::move(results))); + return Status::OK(); + } + + Status RunAction2(std::unique_ptr* out) { + // Empty + *out = std::unique_ptr(new SimpleResultStream({})); + return Status::OK(); + } + + Status DoAction(const Action& action, std::unique_ptr* out) override { + if (action.type == "action1") { + return RunAction1(action, out); + } else if (action.type == "action2") { + return RunAction2(out); + } else { + return Status::NotImplemented(action.type); + } + } + + Status ListActions(std::vector* out) override { + std::vector actions = ExampleActionTypes(); + *out = std::move(actions); + return Status::OK(); + } +}; + +} // namespace flight +} // namespace arrow + +std::unique_ptr g_server; + +void Shutdown(int signal) { + if (g_server != nullptr) { + g_server->Shutdown(); + } +} + +int main(int argc, char** argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + + // SIGTERM shuts down the server + signal(SIGTERM, Shutdown); + + g_server.reset(new arrow::flight::FlightTestServer); + + // TODO(wesm): How can we tell if the server failed to start for some reason? + g_server->Run(FLAGS_port); + return 0; +} diff --git a/cpp/src/arrow/flight/test-util.h b/cpp/src/arrow/flight/test-util.h new file mode 100644 index 0000000000000..4a1299719b837 --- /dev/null +++ b/cpp/src/arrow/flight/test-util.h @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/ipc/test-common.h" +#include "arrow/status.h" +#include "arrow/test-util.h" + +#include "arrow/flight/api.h" +#include "arrow/flight/internal.h" + +namespace bp = boost::process; + +namespace arrow { +namespace flight { + +// ---------------------------------------------------------------------- +// Fixture to use for running test servers + +struct TestServer { + public: + explicit TestServer(const std::string& executable_name, int port) + : executable_name_(executable_name), port_(port) {} + + void Start() { + std::string str_port = std::to_string(port_); + server_process_.reset( + new bp::child(bp::search_path(executable_name_), "-port", str_port)); + std::cout << "Server running with pid " << server_process_->id() << std::endl; + } + + int Stop() { + kill(server_process_->id(), SIGTERM); + server_process_->wait(); + return server_process_->exit_code(); + } + + bool IsRunning() { return server_process_->running(); } + + int port() const { return port_; } + + private: + std::string executable_name_; + int port_; + std::unique_ptr server_process_; +}; + +// ---------------------------------------------------------------------- +// A RecordBatchReader for serving a sequence of in-memory record batches + +class BatchIterator : public RecordBatchReader { + public: + BatchIterator(const std::shared_ptr& schema, + const std::vector>& batches) + : schema_(schema), batches_(batches), position_(0) {} + + std::shared_ptr schema() const override { return schema_; } + + Status ReadNext(std::shared_ptr* out) override { + if (position_ >= batches_.size()) { + *out = nullptr; + } else { + *out = batches_[position_++]; + } + return Status::OK(); + } + + private: + std::shared_ptr schema_; + std::vector> batches_; + size_t position_; +}; + +// ---------------------------------------------------------------------- +// Example data for test-server and unit tests + +using BatchVector = std::vector>; + +std::shared_ptr ExampleSchema1() { + auto f0 = field("f0", int32()); + auto f1 = field("f1", int32()); + return ::arrow::schema({f0, f1}); +} + +std::shared_ptr ExampleSchema2() { + auto f0 = field("f0", utf8()); + auto f1 = field("f1", binary()); + return ::arrow::schema({f0, f1}); +} + +Status MakeFlightInfo(const Schema& schema, const FlightDescriptor& descriptor, + const std::vector& endpoints, + uint64_t total_records, uint64_t total_bytes, + FlightInfo::Data* out) { + out->descriptor = descriptor; + out->endpoints = endpoints; + out->total_records = total_records; + out->total_bytes = total_bytes; + return internal::SchemaToString(schema, &out->schema); +} + +std::vector ExampleFlightInfo() { + FlightEndpoint endpoint1({{"ticket-id-1"}, {{"foo1.bar.com", 92385}}}); + FlightEndpoint endpoint2({{"ticket-id-2"}, {{"foo2.bar.com", 92385}}}); + FlightEndpoint endpoint3({{"ticket-id-3"}, {{"foo3.bar.com", 92385}}}); + FlightDescriptor descr1{FlightDescriptor::PATH, "", {"foo", "bar"}}; + FlightDescriptor descr2{FlightDescriptor::CMD, "my_command", {}}; + + auto schema1 = ExampleSchema1(); + auto schema2 = ExampleSchema2(); + + FlightInfo::Data flight1, flight2; + EXPECT_OK( + MakeFlightInfo(*schema1, descr1, {endpoint1, endpoint2}, 1000, 100000, &flight1)); + EXPECT_OK(MakeFlightInfo(*schema2, descr2, {endpoint3}, 1000, 100000, &flight2)); + return {FlightInfo(flight1), FlightInfo(flight2)}; +} + +Status SimpleIntegerBatches(const int num_batches, BatchVector* out) { + std::shared_ptr batch; + for (int i = 0; i < num_batches; ++i) { + // Make all different sizes, use different random seed + RETURN_NOT_OK(ipc::MakeIntBatchSized(10 + i, &batch, i)); + out->push_back(batch); + } + return Status::OK(); +} + +std::vector ExampleActionTypes() { + return {{"drop", "drop a dataset"}, {"cache", "cache a dataset"}}; +} + +} // namespace flight +} // namespace arrow diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc new file mode 100644 index 0000000000000..8c7588d03bd38 --- /dev/null +++ b/cpp/src/arrow/flight/types.cc @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/flight/types.h" + +#include +#include +#include +#include + +#include "arrow/io/memory.h" +#include "arrow/ipc/reader.h" +#include "arrow/status.h" + +namespace arrow { +namespace flight { + +Status FlightInfo::GetSchema(std::shared_ptr* out) const { + if (reconstructed_schema_) { + *out = schema_; + return Status::OK(); + } + /// XXX(wesm): arrow::ipc::ReadSchema in its current form will not suffice + /// for reading schemas with dictionaries. See ARROW-3144 + io::BufferReader schema_reader(reinterpret_cast(data_.schema.c_str()), + static_cast(data_.schema.size())); + RETURN_NOT_OK(ipc::ReadSchema(&schema_reader, &schema_)); + reconstructed_schema_ = true; + *out = schema_; + return Status::OK(); +} + +SimpleFlightListing::SimpleFlightListing(const std::vector& flights) + : position_(0), flights_(flights) {} + +SimpleFlightListing::SimpleFlightListing(std::vector&& flights) + : position_(0), flights_(std::move(flights)) {} + +Status SimpleFlightListing::Next(std::unique_ptr* info) { + if (position_ >= static_cast(flights_.size())) { + *info = nullptr; + return Status::OK(); + } + *info = std::unique_ptr(new FlightInfo(std::move(flights_[position_++]))); + return Status::OK(); +} + +SimpleResultStream::SimpleResultStream(std::vector&& results) + : results_(std::move(results)), position_(0) {} + +Status SimpleResultStream::Next(std::unique_ptr* result) { + if (position_ >= results_.size()) { + *result = nullptr; + return Status::OK(); + } + *result = std::unique_ptr(new Result(std::move(results_[position_++]))); + return Status::OK(); +} + +} // namespace flight +} // namespace arrow diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h new file mode 100644 index 0000000000000..0362105bbc592 --- /dev/null +++ b/cpp/src/arrow/flight/types.h @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Data structure for Flight RPC. API should be considered experimental for now + +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/util/visibility.h" + +namespace arrow { + +class Buffer; +class Schema; +class Status; + +namespace flight { + +/// \brief A type of action that can be performed with the DoAction RPC +struct ActionType { + /// Name of action + std::string type; + + /// Opaque action description + std::string description; +}; + +/// \brief Opaque selection critera for ListFlights RPC +struct Criteria { + /// Opaque criteria expression, dependent on server implementation + std::string expression; +}; + +/// \brief An action to perform with the DoAction RPC +struct Action { + /// The action type + std::string type; + + /// The action content as a Buffer + std::shared_ptr body; +}; + +/// \brief Opaque result returned after executing an action +struct Result { + std::shared_ptr body; +}; + +/// \brief A message received after completing a DoPut stream +struct PutResult {}; + +/// \brief A request to retrieve or generate a dataset +struct FlightDescriptor { + enum DescriptorType { + UNKNOWN = 0, /// Unused + PATH = 1, /// Named path identifying a dataset + CMD = 2 /// Opaque command to generate a dataset + }; + + /// The descriptor type + DescriptorType type; + + /// Opaque value used to express a command. Should only be defined when type + /// is CMD + std::string cmd; + + /// List of strings identifying a particular dataset. Should only be defined + /// when type is PATH + std::vector path; +}; + +/// \brief Data structure providing an opaque identifier or credential to use +/// when requesting a data stream with the DoGet RPC +struct Ticket { + std::string ticket; +}; + +/// \brief A host location (hostname and port) +struct Location { + std::string host; + int32_t port; +}; + +/// \brief A flight ticket and list of locations where the ticket can be +/// redeemed +struct FlightEndpoint { + /// Opaque ticket identify; use with DoGet RPC + Ticket ticket; + + /// List of locations where ticket can be redeemed. If the list is empty, the + /// ticket can only be redeemed on the current service where the ticket was + /// generated + std::vector locations; +}; + +/// \brief The access coordinates for retireval of a dataset, returned by +/// GetFlightInfo +class FlightInfo { + public: + struct Data { + std::string schema; + FlightDescriptor descriptor; + std::vector endpoints; + uint64_t total_records; + uint64_t total_bytes; + }; + + explicit FlightInfo(const Data& data) : data_(data), reconstructed_schema_(false) {} + explicit FlightInfo(Data&& data) + : data_(std::move(data)), reconstructed_schema_(false) {} + + /// Deserialize the Arrow schema of the dataset, to be passed to each call to + /// DoGet + Status GetSchema(std::shared_ptr* out) const; + + const std::string& serialized_schema() const { return data_.schema; } + + /// The descriptor associated with this flight, may not be set + const FlightDescriptor& descriptor() const { return data_.descriptor; } + + /// A list of endpoints associated with the flight (dataset). To consume the + /// whole flight, all endpoints must be consumed + const std::vector& endpoints() const { return data_.endpoints; } + + /// The total number of records (rows) in the dataset. If unknown, set to -1 + uint64_t total_records() const { return data_.total_records; } + + /// The total number of bytes in the dataset. If unknown, set to -1 + uint64_t total_bytes() const { return data_.total_bytes; } + + private: + Data data_; + mutable std::shared_ptr schema_; + mutable bool reconstructed_schema_; +}; + +// TODO(wesm): NYI +class ARROW_EXPORT FlightPutWriter { + public: + virtual ~FlightPutWriter() = default; +}; + +/// \brief An iterator to FlightInfo instances returned by ListFlights +class ARROW_EXPORT FlightListing { + public: + virtual ~FlightListing() = default; + + /// \brief Retrieve the next FlightInfo from the iterator. Returns nullptr + /// when there are none left + /// \param[out] info a single FlightInfo + /// \return Status + virtual Status Next(std::unique_ptr* info) = 0; +}; + +/// \brief An iterator to Result instances returned by DoAction +class ARROW_EXPORT ResultStream { + public: + virtual ~ResultStream() = default; + + /// \brief Retrieve the next Result from the iterator. Returns nullptr + /// when there are none left + /// \param[out] info a single Result + /// \return Status + virtual Status Next(std::unique_ptr* info) = 0; +}; + +// \brief Create a FlightListing from a vector of FlightInfo objects. This can +// be iterated once, then it is consumed +class ARROW_EXPORT SimpleFlightListing : public FlightListing { + public: + explicit SimpleFlightListing(const std::vector& flights); + explicit SimpleFlightListing(std::vector&& flights); + + Status Next(std::unique_ptr* info) override; + + private: + int position_; + std::vector flights_; +}; + +class ARROW_EXPORT SimpleResultStream : public ResultStream { + public: + explicit SimpleResultStream(std::vector&& results); + Status Next(std::unique_ptr* result) override; + + private: + std::vector results_; + size_t position_; +}; + +} // namespace flight +} // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index 5eb5cd734fc13..cb375458004a0 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -340,7 +340,7 @@ TEST_F(TestCudaArrowIpc, BasicWriteRead) { io::BufferReader cpu_reader(host_buffer); ASSERT_OK(ipc::ReadRecordBatch(batch->schema(), &cpu_reader, &cpu_batch)); - ipc::CompareBatch(*batch, *cpu_batch); + CompareBatch(*batch, *cpu_batch); } } // namespace gpu diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index 4ee0c345e4596..d77248a378277 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -73,6 +73,8 @@ class Message::MessageImpl { const void* header() const { return message_->header(); } + int64_t body_length() const { return message_->bodyLength(); } + std::shared_ptr body() const { return body_; } std::shared_ptr metadata() const { return metadata_; } @@ -101,6 +103,8 @@ Message::~Message() {} std::shared_ptr Message::body() const { return impl_->body(); } +int64_t Message::body_length() const { return impl_->body_length(); } + std::shared_ptr Message::metadata() const { return impl_->metadata(); } Message::Type Message::type() const { return impl_->type(); } diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index d150eab26d93b..08176abad81d2 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -125,6 +125,10 @@ class ARROW_EXPORT Message { /// \return buffer is null if no body std::shared_ptr body() const; + /// \brief The expected body length according to the metadata, for + /// verification purposes + int64_t body_length() const; + /// \brief The Message type Type type() const; diff --git a/cpp/src/arrow/ipc/metadata-internal.cc b/cpp/src/arrow/ipc/metadata-internal.cc index 530262d14dc8f..0bf18f377bb2d 100644 --- a/cpp/src/arrow/ipc/metadata-internal.cc +++ b/cpp/src/arrow/ipc/metadata-internal.cc @@ -960,6 +960,8 @@ Status WriteMessage(const Buffer& message, io::OutputStream* file, int64_t start_offset; RETURN_NOT_OK(file->Tell(&start_offset)); + // TODO(wesm): Should we depend on the position of the OutputStream? See + // ARROW-3212 int32_t padded_message_length = static_cast(message.size()) + 4; const int32_t remainder = (padded_message_length + static_cast(start_offset)) % 8; diff --git a/cpp/src/arrow/ipc/metadata-internal.h b/cpp/src/arrow/ipc/metadata-internal.h index 380f3c9eb1013..730a1a598d526 100644 --- a/cpp/src/arrow/ipc/metadata-internal.h +++ b/cpp/src/arrow/ipc/metadata-internal.h @@ -106,7 +106,7 @@ Status WriteMessage(const Buffer& message, io::OutputStream* file, // Serialize arrow::Schema as a Flatbuffer // // \param[in] schema a Schema instance -// \param[inout] dictionary_memo class for tracking dictionaries and assigning +// \param[in,out] dictionary_memo class for tracking dictionaries and assigning // dictionary ids // \param[out] out the serialized arrow::Buffer // \return Status outcome diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 5ffbe6f40f1c0..92cf75b297552 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -578,6 +578,9 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { std::unique_ptr message; RETURN_NOT_OK(ReadMessage(block.offset, block.metadata_length, file_, &message)); + // TODO(wesm): this breaks integration tests, see ARROW-3256 + // DCHECK_EQ(message->body_length(), block.body_length); + io::BufferReader reader(message->body()); return ::arrow::ipc::ReadRecordBatch(*message->metadata(), schema_, &reader, batch); } @@ -596,6 +599,9 @@ class RecordBatchFileReader::RecordBatchFileReaderImpl { std::unique_ptr message; RETURN_NOT_OK(ReadMessage(block.offset, block.metadata_length, file_, &message)); + // TODO(wesm): this breaks integration tests, see ARROW-3256 + // DCHECK_EQ(message->body_length(), block.body_length); + io::BufferReader reader(message->body()); std::shared_ptr dictionary; diff --git a/cpp/src/arrow/ipc/test-common.h b/cpp/src/arrow/ipc/test-common.h index 299f050228d82..4f7de26e35e16 100644 --- a/cpp/src/arrow/ipc/test-common.h +++ b/cpp/src/arrow/ipc/test-common.h @@ -39,36 +39,6 @@ namespace arrow { namespace ipc { -static inline void AssertSchemaEqual(const Schema& lhs, const Schema& rhs) { - if (!lhs.Equals(rhs)) { - std::stringstream ss; - ss << "left schema: " << lhs.ToString() << std::endl - << "right schema: " << rhs.ToString() << std::endl; - FAIL() << ss.str(); - } -} - -static inline void CompareBatch(const RecordBatch& left, const RecordBatch& right) { - if (!left.schema()->Equals(*right.schema())) { - FAIL() << "Left schema: " << left.schema()->ToString() - << "\nRight schema: " << right.schema()->ToString(); - } - ASSERT_EQ(left.num_columns(), right.num_columns()) - << left.schema()->ToString() << " result: " << right.schema()->ToString(); - ASSERT_EQ(left.num_rows(), right.num_rows()); - for (int i = 0; i < left.num_columns(); ++i) { - if (!left.column(i)->Equals(right.column(i))) { - std::stringstream ss; - ss << "Idx: " << i << " Name: " << left.column_name(i); - ss << std::endl << "Left: "; - ASSERT_OK(PrettyPrint(*left.column(i), 0, &ss)); - ss << std::endl << "Right: "; - ASSERT_OK(PrettyPrint(*right.column(i), 0, &ss)); - FAIL() << ss.str(); - } - } -} - static inline void CompareArraysDetailed(int index, const Array& result, const Array& expected) { if (!expected.Equals(result)) { @@ -96,9 +66,9 @@ const auto kListInt32 = list(int32()); const auto kListListInt32 = list(kListInt32); Status MakeRandomInt32Array(int64_t length, bool include_nulls, MemoryPool* pool, - std::shared_ptr* out) { + std::shared_ptr* out, uint32_t seed = 0) { std::shared_ptr data; - RETURN_NOT_OK(MakeRandomInt32Buffer(length, pool, &data)); + RETURN_NOT_OK(MakeRandomBuffer(length, pool, &data, seed)); Int32Builder builder(int32(), pool); RETURN_NOT_OK(builder.Resize(length)); if (include_nulls) { @@ -195,7 +165,8 @@ Status MakeBooleanBatch(std::shared_ptr* out) { return MakeBooleanBatchSized(1000, out); } -Status MakeIntBatchSized(int length, std::shared_ptr* out) { +Status MakeIntBatchSized(int length, std::shared_ptr* out, + uint32_t seed = 0) { // Make the schema auto f0 = field("f0", int32()); auto f1 = field("f1", int32()); @@ -204,8 +175,8 @@ Status MakeIntBatchSized(int length, std::shared_ptr* out) { // Example data std::shared_ptr a0, a1; MemoryPool* pool = default_memory_pool(); - RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0)); - RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1)); + RETURN_NOT_OK(MakeRandomInt32Array(length, false, pool, &a0, seed)); + RETURN_NOT_OK(MakeRandomInt32Array(length, true, pool, &a1, seed + 1)); *out = RecordBatch::Make(schema, length, {a0, a1}); return Status::OK(); } diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index ec6c5d037e568..60ca34efb68e2 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -96,11 +96,14 @@ static inline bool NeedTruncate(int64_t offset, const Buffer* buffer, return offset != 0 || min_length < buffer->size(); } +namespace internal { + class RecordBatchSerializer : public ArrayVisitor { public: RecordBatchSerializer(MemoryPool* pool, int64_t buffer_start_offset, - int max_recursion_depth, bool allow_64bit) - : pool_(pool), + int max_recursion_depth, bool allow_64bit, IpcPayload* out) + : out_(out), + pool_(pool), max_recursion_depth_(max_recursion_depth), buffer_start_offset_(buffer_start_offset), allow_64bit_(allow_64bit) { @@ -125,19 +128,25 @@ class RecordBatchSerializer : public ArrayVisitor { std::shared_ptr bitmap; RETURN_NOT_OK(GetTruncatedBitmap(arr.offset(), arr.length(), arr.null_bitmap(), pool_, &bitmap)); - buffers_.push_back(bitmap); + out_->body_buffers.emplace_back(bitmap); } else { // Push a dummy zero-length buffer, not to be copied - buffers_.push_back(std::make_shared(nullptr, 0)); + out_->body_buffers.emplace_back(std::make_shared(nullptr, 0)); } return arr.Accept(this); } - Status Assemble(const RecordBatch& batch, int64_t* body_length) { + // Override this for writing dictionary metadata + virtual Status SerializeMetadata(int64_t num_rows) { + return WriteRecordBatchMessage(num_rows, out_->body_length, field_nodes_, + buffer_meta_, &out_->metadata); + } + + Status Assemble(const RecordBatch& batch) { if (field_nodes_.size() > 0) { field_nodes_.clear(); buffer_meta_.clear(); - buffers_.clear(); + out_->body_buffers.clear(); } // Perform depth-first traversal of the row-batch @@ -149,11 +158,11 @@ class RecordBatchSerializer : public ArrayVisitor { // reference. May be 0 or some other position in an address space int64_t offset = buffer_start_offset_; - buffer_meta_.reserve(buffers_.size()); + buffer_meta_.reserve(out_->body_buffers.size()); // Construct the buffer metadata for the record batch header - for (size_t i = 0; i < buffers_.size(); ++i) { - const Buffer* buffer = buffers_[i].get(); + for (size_t i = 0; i < out_->body_buffers.size(); ++i) { + const Buffer* buffer = out_->body_buffers[i].get(); int64_t size = 0; int64_t padding = 0; @@ -167,69 +176,15 @@ class RecordBatchSerializer : public ArrayVisitor { offset += size + padding; } - *body_length = offset - buffer_start_offset_; - DCHECK(BitUtil::IsMultipleOf8(*body_length)); - - return Status::OK(); - } - - // Override this for writing dictionary metadata - virtual Status WriteMetadataMessage(int64_t num_rows, int64_t body_length, - std::shared_ptr* out) { - return WriteRecordBatchMessage(num_rows, body_length, field_nodes_, buffer_meta_, - out); - } - - Status Write(const RecordBatch& batch, io::OutputStream* dst, int32_t* metadata_length, - int64_t* body_length) { - RETURN_NOT_OK(Assemble(batch, body_length)); - -#ifndef NDEBUG - int64_t start_position, current_position; - RETURN_NOT_OK(dst->Tell(&start_position)); -#endif + out_->body_length = offset - buffer_start_offset_; + DCHECK(BitUtil::IsMultipleOf8(out_->body_length)); // Now that we have computed the locations of all of the buffers in shared // memory, the data header can be converted to a flatbuffer and written out // // Note: The memory written here is prefixed by the size of the flatbuffer // itself as an int32_t. - std::shared_ptr metadata_fb; - RETURN_NOT_OK(WriteMetadataMessage(batch.num_rows(), *body_length, &metadata_fb)); - RETURN_NOT_OK(internal::WriteMessage(*metadata_fb, dst, metadata_length)); - -#ifndef NDEBUG - RETURN_NOT_OK(dst->Tell(¤t_position)); - DCHECK(BitUtil::IsMultipleOf8(current_position)); -#endif - - // Now write the buffers - for (size_t i = 0; i < buffers_.size(); ++i) { - const Buffer* buffer = buffers_[i].get(); - int64_t size = 0; - int64_t padding = 0; - - // The buffer might be null if we are handling zero row lengths. - if (buffer) { - size = buffer->size(); - padding = BitUtil::RoundUpToMultipleOf8(size) - size; - } - - if (size > 0) { - RETURN_NOT_OK(dst->Write(buffer->data(), size)); - } - - if (padding > 0) { - RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); - } - } - -#ifndef NDEBUG - RETURN_NOT_OK(dst->Tell(¤t_position)); - DCHECK(BitUtil::IsMultipleOf8(current_position)); -#endif - - return Status::OK(); + return SerializeMetadata(batch.num_rows()); } protected: @@ -251,7 +206,7 @@ class RecordBatchSerializer : public ArrayVisitor { data->size() - byte_offset); data = SliceBuffer(data, byte_offset, buffer_length); } - buffers_.push_back(data); + out_->body_buffers.emplace_back(data); return Status::OK(); } @@ -303,8 +258,8 @@ class RecordBatchSerializer : public ArrayVisitor { data = SliceBuffer(data, start_offset, slice_length); } - buffers_.push_back(value_offsets); - buffers_.push_back(data); + out_->body_buffers.emplace_back(value_offsets); + out_->body_buffers.emplace_back(data); return Status::OK(); } @@ -312,12 +267,12 @@ class RecordBatchSerializer : public ArrayVisitor { std::shared_ptr data; RETURN_NOT_OK( GetTruncatedBitmap(array.offset(), array.length(), array.values(), pool_, &data)); - buffers_.push_back(data); + out_->body_buffers.emplace_back(data); return Status::OK(); } Status Visit(const NullArray& array) override { - buffers_.push_back(nullptr); + out_->body_buffers.emplace_back(nullptr); return Status::OK(); } @@ -352,7 +307,7 @@ class RecordBatchSerializer : public ArrayVisitor { Status Visit(const ListArray& array) override { std::shared_ptr value_offsets; RETURN_NOT_OK(GetZeroBasedValueOffsets(array, &value_offsets)); - buffers_.push_back(value_offsets); + out_->body_buffers.emplace_back(value_offsets); --max_recursion_depth_; std::shared_ptr values = array.values(); @@ -390,7 +345,7 @@ class RecordBatchSerializer : public ArrayVisitor { std::shared_ptr type_ids; RETURN_NOT_OK(GetTruncatedBuffer( offset, length, array.type_ids(), pool_, &type_ids)); - buffers_.push_back(type_ids); + out_->body_buffers.emplace_back(type_ids); --max_recursion_depth_; if (array.mode() == UnionMode::DENSE) { @@ -449,7 +404,7 @@ class RecordBatchSerializer : public ArrayVisitor { value_offsets = shifted_offsets_buffer; } - buffers_.push_back(value_offsets); + out_->body_buffers.emplace_back(value_offsets); // Visit children and slice accordingly for (int i = 0; i < type.num_children(); ++i) { @@ -487,12 +442,14 @@ class RecordBatchSerializer : public ArrayVisitor { return array.indices()->Accept(this); } + // Destination for output buffers + IpcPayload* out_; + // In some cases, intermediate buffers may need to be allocated (with sliced arrays) MemoryPool* pool_; std::vector field_nodes_; std::vector buffer_meta_; - std::vector> buffers_; int64_t max_recursion_depth_; int64_t buffer_start_offset_; @@ -501,29 +458,79 @@ class RecordBatchSerializer : public ArrayVisitor { class DictionaryWriter : public RecordBatchSerializer { public: - using RecordBatchSerializer::RecordBatchSerializer; - - Status WriteMetadataMessage(int64_t num_rows, int64_t body_length, - std::shared_ptr* out) override { - return WriteDictionaryMessage(dictionary_id_, num_rows, body_length, field_nodes_, - buffer_meta_, out); + DictionaryWriter(int64_t dictionary_id, MemoryPool* pool, int64_t buffer_start_offset, + int max_recursion_depth, bool allow_64bit, IpcPayload* out) + : RecordBatchSerializer(pool, buffer_start_offset, max_recursion_depth, allow_64bit, + out), + dictionary_id_(dictionary_id) {} + + Status SerializeMetadata(int64_t num_rows) override { + return WriteDictionaryMessage(dictionary_id_, num_rows, out_->body_length, + field_nodes_, buffer_meta_, &out_->metadata); } - Status Write(int64_t dictionary_id, const std::shared_ptr& dictionary, - io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length) { - dictionary_id_ = dictionary_id; - + Status Assemble(const std::shared_ptr& dictionary) { // Make a dummy record batch. A bit tedious as we have to make a schema auto schema = arrow::schema({arrow::field("dictionary", dictionary->type())}); auto batch = RecordBatch::Make(schema, dictionary->length(), {dictionary}); - return RecordBatchSerializer::Write(*batch, dst, metadata_length, body_length); + return RecordBatchSerializer::Assemble(*batch); } private: - // TODO(wesm): Setting this in Write is a bit unclean, but it works int64_t dictionary_id_; }; +Status WriteIpcPayload(const IpcPayload& payload, io::OutputStream* dst, + int32_t* metadata_length) { +#ifndef NDEBUG + int64_t start_position, current_position; + RETURN_NOT_OK(dst->Tell(&start_position)); +#endif + + RETURN_NOT_OK(internal::WriteMessage(*payload.metadata, dst, metadata_length)); + +#ifndef NDEBUG + RETURN_NOT_OK(dst->Tell(¤t_position)); + DCHECK(BitUtil::IsMultipleOf8(current_position)); +#endif + + // Now write the buffers + for (size_t i = 0; i < payload.body_buffers.size(); ++i) { + const Buffer* buffer = payload.body_buffers[i].get(); + int64_t size = 0; + int64_t padding = 0; + + // The buffer might be null if we are handling zero row lengths. + if (buffer) { + size = buffer->size(); + padding = BitUtil::RoundUpToMultipleOf8(size) - size; + } + + if (size > 0) { + RETURN_NOT_OK(dst->Write(buffer->data(), size)); + } + + if (padding > 0) { + RETURN_NOT_OK(dst->Write(kPaddingBytes, padding)); + } + } + +#ifndef NDEBUG + RETURN_NOT_OK(dst->Tell(¤t_position)); + DCHECK(BitUtil::IsMultipleOf8(current_position)); +#endif + + return Status::OK(); +} + +Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool, + IpcPayload* out) { + RecordBatchSerializer writer(pool, 0, kMaxNestingDepth, true, out); + return writer.Assemble(batch); +} + +} // namespace internal + // Adds padding bytes if necessary to ensure all memory blocks are written on // 64-byte boundaries. Status AlignStreamPosition(io::OutputStream* stream) { @@ -540,9 +547,18 @@ Status WriteRecordBatch(const RecordBatch& batch, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, MemoryPool* pool, int max_recursion_depth, bool allow_64bit) { - RecordBatchSerializer writer(pool, buffer_start_offset, max_recursion_depth, - allow_64bit); - return writer.Write(batch, dst, metadata_length, body_length); + internal::IpcPayload payload; + internal::RecordBatchSerializer writer(pool, buffer_start_offset, max_recursion_depth, + allow_64bit, &payload); + RETURN_NOT_OK(writer.Assemble(batch)); + + // TODO(wesm): it's a rough edge that the metadata and body length here are + // computed separately + + // The body size is computed in the payload + *body_length = payload.body_length; + + return internal::WriteIpcPayload(payload, dst, metadata_length); } Status WriteRecordBatchStream(const std::vector>& batches, @@ -675,8 +691,14 @@ Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool, Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr& dictionary, int64_t buffer_start_offset, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) { - DictionaryWriter writer(pool, buffer_start_offset, kMaxNestingDepth, false); - return writer.Write(dictionary_id, dictionary, dst, metadata_length, body_length); + internal::IpcPayload payload; + internal::DictionaryWriter writer(dictionary_id, pool, buffer_start_offset, + kMaxNestingDepth, true, &payload); + RETURN_NOT_OK(writer.Assemble(dictionary)); + + // The body size is computed in the payload + *body_length = payload.body_length; + return internal::WriteIpcPayload(payload, dst, metadata_length); } Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size) { diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index 6dbf29da4806a..bcf09aac47309 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -269,6 +269,36 @@ ARROW_EXPORT Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length, int64_t* body_length); +namespace internal { + +// These internal APIs may change without warning or deprecation + +// Intermediate data structure with metadata header plus zero or more buffers +// for the message body. This data can either be written out directly as an +// encapsulated IPC message or used with Flight RPCs +struct IpcPayload { + Message::Type type; + std::shared_ptr metadata; + std::vector> body_buffers; + int64_t body_length; +}; + +/// \brief Extract IPC payloads from given schema for purposes of wire +/// transport, separate from using the *StreamWriter classes +ARROW_EXPORT +Status GetDictionaryPayloads(const Schema& schema, + std::vector>* out); + +/// \brief Compute IpcPayload for the given record batch +/// \param[in] batch the RecordBatch that is being serialized +/// \param[in,out] pool for any required temporary memory allocations +/// \param[out] out the returned IpcPayload +/// \return Status +ARROW_EXPORT +Status GetRecordBatchPayload(const RecordBatch& batch, MemoryPool* pool, IpcPayload* out); + +} // namespace internal + } // namespace ipc } // namespace arrow diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 1a50a078c010d..b7179a360c0f6 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -18,7 +18,14 @@ #ifndef ARROW_TEST_UTIL_H_ #define ARROW_TEST_UTIL_H_ +#ifndef _WIN32 +#include +#include +#include +#endif + #include +#include #include #include #include @@ -27,6 +34,7 @@ #include #include #include +#include #include #include @@ -44,6 +52,11 @@ #include "arrow/util/decimal.h" #include "arrow/util/logging.h" +static inline void sleep_for(double seconds) { + std::this_thread::sleep_for( + std::chrono::nanoseconds(static_cast(seconds * 1e9))); +} + #define STRINGIFY(x) #x #define ASSERT_RAISES(ENUM, expr) \ @@ -272,6 +285,17 @@ void rand_uniform_int(int64_t n, uint32_t seed, T min_value, T max_value, U* out std::generate(out, out + n, [&d, &gen] { return static_cast(d(gen)); }); } +template +struct GenerateRandom {}; + +template +struct GenerateRandom::value>::type> { + static void Gen(int64_t length, uint32_t seed, void* out) { + rand_uniform_int(length, seed, std::numeric_limits::min(), + std::numeric_limits::max(), reinterpret_cast(out)); + } +}; + static inline void random_ascii(int64_t n, uint32_t seed, uint8_t* out) { rand_uniform_int(n, seed, static_cast('A'), static_cast('z'), out); } @@ -280,13 +304,13 @@ static inline int64_t CountNulls(const std::vector& valid_bytes) { return static_cast(std::count(valid_bytes.cbegin(), valid_bytes.cend(), '\0')); } -Status MakeRandomInt32Buffer(int64_t length, MemoryPool* pool, - std::shared_ptr* out, uint32_t seed = 0) { +template +Status MakeRandomBuffer(int64_t length, MemoryPool* pool, + std::shared_ptr* out, uint32_t seed = 0) { DCHECK(pool); std::shared_ptr result; - RETURN_NOT_OK(AllocateResizableBuffer(pool, sizeof(int32_t) * length, &result)); - rand_uniform_int(length, seed, 0, std::numeric_limits::max(), - reinterpret_cast(result->mutable_data())); + RETURN_NOT_OK(AllocateResizableBuffer(pool, sizeof(T) * length, &result)); + GenerateRandom::Gen(length, seed, result->mutable_data()); *out = result; return Status::OK(); } @@ -344,6 +368,15 @@ void AssertBufferEqual(const Buffer& buffer, const Buffer& expected) { ASSERT_TRUE(buffer.Equals(expected)); } +static inline void AssertSchemaEqual(const Schema& lhs, const Schema& rhs) { + if (!lhs.Equals(rhs)) { + std::stringstream ss; + ss << "left schema: " << lhs.ToString() << std::endl + << "right schema: " << rhs.ToString() << std::endl; + FAIL() << ss.str(); + } +} + void PrintColumn(const Column& col, std::stringstream* ss) { const ChunkedArray& carr = *col.data(); for (int i = 0; i < carr.num_chunks(); ++i) { @@ -456,6 +489,53 @@ Status MakeArray(const std::vector& valid_bytes, const std::vector& } \ } while (false) +static inline void CompareBatch(const RecordBatch& left, const RecordBatch& right) { + if (!left.schema()->Equals(*right.schema())) { + FAIL() << "Left schema: " << left.schema()->ToString() + << "\nRight schema: " << right.schema()->ToString(); + } + ASSERT_EQ(left.num_columns(), right.num_columns()) + << left.schema()->ToString() << " result: " << right.schema()->ToString(); + ASSERT_EQ(left.num_rows(), right.num_rows()); + for (int i = 0; i < left.num_columns(); ++i) { + if (!left.column(i)->Equals(right.column(i))) { + std::stringstream ss; + ss << "Idx: " << i << " Name: " << left.column_name(i); + ss << std::endl << "Left: "; + ASSERT_OK(PrettyPrint(*left.column(i), 0, &ss)); + ss << std::endl << "Right: "; + ASSERT_OK(PrettyPrint(*right.column(i), 0, &ss)); + FAIL() << ss.str(); + } + } +} + +// ---------------------------------------------------------------------- +// A RecordBatchReader for serving a sequence of in-memory record batches + +class BatchIterator : public RecordBatchReader { + public: + BatchIterator(const std::shared_ptr& schema, + const std::vector>& batches) + : schema_(schema), batches_(batches), position_(0) {} + + std::shared_ptr schema() const override { return schema_; } + + Status ReadNext(std::shared_ptr* out) override { + if (position_ >= batches_.size()) { + *out = nullptr; + } else { + *out = batches_[position_++]; + } + return Status::OK(); + } + + private: + std::shared_ptr schema_; + std::vector> batches_; + size_t position_; +}; + } // namespace arrow #endif // ARROW_TEST_UTIL_H_ diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.h index 56fadca63d515..121f301d64604 100644 --- a/cpp/src/arrow/util/memory.h +++ b/cpp/src/arrow/util/memory.h @@ -59,8 +59,8 @@ void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes, std::vector> futures; for (int i = 0; i < num_threads; i++) { - futures.push_back(pool->Submit(memcpy, dst + prefix + i * chunk_size, - left + i * chunk_size, chunk_size)); + futures.emplace_back(pool->Submit(memcpy, dst + prefix + i * chunk_size, + left + i * chunk_size, chunk_size)); } memcpy(dst, src, prefix); memcpy(dst + prefix + num_threads * chunk_size, right, suffix); diff --git a/cpp/src/parquet/util/stopwatch.h b/cpp/src/arrow/util/stopwatch.h similarity index 71% rename from cpp/src/parquet/util/stopwatch.h rename to cpp/src/arrow/util/stopwatch.h index 68cf792fe1c23..f16c2ec4827b8 100644 --- a/cpp/src/parquet/util/stopwatch.h +++ b/cpp/src/arrow/util/stopwatch.h @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -#ifndef PARQUET_UTIL_STOPWATCH_H -#define PARQUET_UTIL_STOPWATCH_H +#pragma once #include #ifndef _MSC_VER @@ -26,27 +25,25 @@ #include #include -namespace parquet { +namespace arrow { + +uint64_t CurrentTime() { + timespec time; + clock_gettime(CLOCK_MONOTONIC, &time); + return 1000000000L * time.tv_sec + time.tv_nsec; +} class StopWatch { public: StopWatch() {} - void Start() { gettimeofday(&start_time, 0); } + void Start() { start_ = CurrentTime(); } // Returns time in nanoseconds. - uint64_t Stop() { - struct timeval t_time; - gettimeofday(&t_time, 0); - - return (1000L * 1000L * 1000L * (t_time.tv_sec - start_time.tv_sec) + - (t_time.tv_usec - start_time.tv_usec)); - } + uint64_t Stop() { return CurrentTime() - start_; } private: - struct timeval start_time; + uint64_t start_; }; -} // namespace parquet - -#endif +} // namespace arrow diff --git a/cpp/src/parquet/util/CMakeLists.txt b/cpp/src/parquet/util/CMakeLists.txt index debf4b3a88cd5..72d4ca28f9b83 100644 --- a/cpp/src/parquet/util/CMakeLists.txt +++ b/cpp/src/parquet/util/CMakeLists.txt @@ -20,7 +20,6 @@ install(FILES comparison.h macros.h memory.h - stopwatch.h visibility.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/parquet/util") diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index cdea520e925ca..381282bd8014f 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -29,8 +29,8 @@ BROTLI_VERSION=v0.6.0 LZ4_VERSION=v1.7.5 ZLIB_VERSION=1.2.8 ZSTD_VERSION=v1.2.0 -PROTOBUF_VERSION=v2.6.0 -GRPC_VERSION=v1.12.1 +PROTOBUF_VERSION=v3.6.1 +GRPC_VERSION=1.14.1 ORC_VERSION=1.5.1 THRIFT_VERSION=0.11.0 GLOG_VERSION=v0.3.5 diff --git a/format/Flight.proto b/format/Flight.proto new file mode 100644 index 0000000000000..ab1d6208c2311 --- /dev/null +++ b/format/Flight.proto @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +option java_package = "org.apache.arrow.flight.impl"; +package arrow.flight.protocol; + +/* + * A flight service is an endpoint for retrieving or storing Arrow data. A + * flight service can expose one or more predefined endpoints that can be + * access using the Arrow Flight Protocol. Additionally, the a flight service + * and expose a set of actions that are available. + */ +service FlightService { + + /* + * Handshake between client and server. Depending on the server, the + * handshake may be required to determine the token that should be used for + * future operations. Both request and response are streams to allow multiple + * roundtrips depending on auth mechanism. + */ + rpc Handshake(stream HandshakeRequest) returns (stream HandshakeResponse) {} + + /* + * Get a list of available streams given a particular criteria. Most flight + * services will expose one or more streams that are readily available for + * retrieval. This api allows listing the streams available for + * consumption. A user can also provide a criteria. The criteria can limit + * the subset of streams that can be listed via this interface. Each flight + * service allows its own definition of how to consume criteria. + */ + rpc ListFlights(Criteria) returns (stream FlightGetInfo) {} + + /* + * For a given FlightDescriptor, get information about how the flight can be + * consumed. This is a useful interface if the consumer of the interface + * already can identify the specific flight to consume. This interface can + * also allow a consumer to generate a flight stream through a specified + * descriptor. For example, a flight descriptor might be something that + * includes a SQL statement or a Pickled Python operation that will be + * executed. In those cases, the descriptor will not be previously available + * within the list of available streams provided by ListFlights but will be + * available for consumption for the duration defined by the specific flight + * service. + */ + rpc GetFlightInfo(FlightDescriptor) returns (FlightGetInfo) {} + + /* + * Retrieve a single stream associated with a particular descriptor + * associated with the referenced ticket. A Flight can be composed of one or + * more streams where each stream can be retrieved using a separate opaque + * ticket that the flight service uses for managing a collection of streams. + */ + rpc DoGet(Ticket) returns (stream FlightData) {} + + /* + * Push a stream to the flight service using associated with a particular + * flight stream. This allows a client of a flight service to upload a stream + * of data. Depending on the particular flight service, a client consumer + * could be allowed to upload a single stream per descriptor or an unlimited + * number. (In the latter, the service might implement a 'seal' action that + * can be applied to a descriptor once all streams are uploaded. + */ + rpc DoPut(stream FlightData) returns (PutResult) {} + + /* + * Flight services can support an arbitrary number of simple actions in + * addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut + * operations that are potentially available. DoAction allows a flight client + * to do a specific action against a flight service. An action includes + * opaque request and response objects that are specific to the type action + * being undertaken. + */ + rpc DoAction(Action) returns (stream Result) {} + + /* + * A flight service exposes all of the available action types that it has + * along with descriptions. This allows different flight consumers to + * understand the capabilities of the flight servic + */ + rpc ListActions(Empty) returns (stream ActionType) {} + +} + +/* + * The request that a client provides to a server on handshake. + */ +message HandshakeRequest { + + /* + * A defined protocol version + */ + uint64 protocol_version = 1; + + /* + * Arbitrary auth/handshake info. + */ + bytes payload = 2; +} + +message HandshakeResponse { + + /* + * A defined protocol version + */ + uint64 protocol_version = 1; + + /* + * Arbitrary auth/handshake info. + */ + bytes payload = 2; +} + +/* + * A message for doing simple auth. + */ +message BasicAuth { + string username = 2; + string password = 3; +} + +message Empty {} + +/* + * Describes an available action, including both the name used for execution + * along with a short description of the purpose of the action. + */ +message ActionType { + string type = 1; + string description = 2; +} + +/* + * A service specific expression that can be used to return a limited the set + * of available Arrow Flight streams. + */ +message Criteria { + bytes expression = 1; +} + +/* + * An opaque action specific for the service. + */ +message Action { + string type = 1; + bytes body = 2; +} + +/* + * An opaque result returned after execution an action. + */ +message Result { + bytes body = 1; +} + +/* + * The name or tag for a Flight. May be used as a way to retrieve or generate + * a flight or be used to expose a set of previously defined flights. + */ +message FlightDescriptor { + + /* + * Describes what type of descriptor is defined. + */ + enum DescriptorType { + + // Protobuf pattern, not used. + UNKNOWN = 0; + + /* + * A named path that identifies a dataset. A path is composed of a string + * or list of strings describing a particular dataset. This is conceptually + * similar to a path inside a filesystem. + */ + PATH = 1; + + /* + * An opaque command to generate a dataset. + */ + CMD = 2; + } + + DescriptorType type = 1; + + /* + * Opaque value used to express a command. Should only be defined when + * type = CMD. + */ + bytes cmd = 2; + + /* + * List of strings identifying a particular dataset. Should only be defined + * when type = PATH. + */ + repeated string path = 3; +} + +/* + * The access coordinates for retrieval of a dataset. With a FlightGetInfo, a + * consumer is able to determine how to retrieve a dataset. + */ +message FlightGetInfo { + // schema of the dataset as described in Schema.fbs::Schema + bytes schema = 1; + + /* + * The descriptor associated with this info. + */ + FlightDescriptor flight_descriptor = 2; + + /* + * A list of endpoints associated with the flight. To consume the whole + * flight, all endpoints must be consumed. + */ + repeated FlightEndpoint endpoint = 3; + + uint64 total_records = 4; + uint64 total_bytes = 5; +} + +/* + * A particular stream or split associated with a flight. + */ +message FlightEndpoint { + + /* + * Token used to retrieve this stream. + */ + Ticket ticket = 1; + + /* + * A list of locations where this ticket can be redeemed. If the list is + * empty, the expectation is that the ticket can only be redeemed on the + * current service where the ticket was generated. + */ + repeated Location location = 2; +} + +/* + * A location where a flight service will accept retrieval of a particular + * stream given a ticket. + */ +message Location { + string host = 1; + int32 port = 2; +} + +/* + * An opaque identifier that the service can use to retrieve a particular + * portion of a stream. + */ +message Ticket { + bytes ticket = 1; +} + +/* + * A batch of Arrow data as part of a stream of batches. + */ +message FlightData { + + /* + * The descriptor of the data. This is only relevant when a client is + * starting a new DoPut stream + */ + FlightDescriptor flight_descriptor = 1; + + /* + * Header for message data as described in Message.fbs::Message + */ + bytes data_header = 2; + + /* + * The actual batch of Arrow data. Preferrably handled with minimal-copies + * comes last in the definition to help with sidecar patterns. + */ + bytes data_body = 1000; +} + +/** + * The response message (currently empty) associated with the submission of a + * DoPut. + */ +message PutResult {} diff --git a/integration/README.md b/integration/README.md index 0a428e6cbb769..760a175027607 100644 --- a/integration/README.md +++ b/integration/README.md @@ -70,7 +70,7 @@ Java `arrow-tool` JAR and the build path for the C++ executables: JAVA_DIR=$ARROW_HOME/java CPP_BUILD_DIR=$ARROW_HOME/cpp/build -VERSION=0.1.1-SNAPSHOT +VERSION=0.11.0-SNAPSHOT export ARROW_JAVA_INTEGRATION_JAR=$JAVA_DIR/tools/target/arrow-tools-$VERSION-jar-with-dependencies.jar export ARROW_CPP_EXE_PATH=$CPP_BUILD_DIR/debug ``` diff --git a/python/.gitignore b/python/.gitignore index fac4e99493756..3346aa62df4c7 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -42,5 +42,4 @@ pyarrow/_table_api.h manylinux1/arrow # plasma store -pyarrow/plasma_store pyarrow/plasma_store_server diff --git a/python/README.md b/python/README.md index c732e3b68b3e9..e9058624fa0fa 100644 --- a/python/README.md +++ b/python/README.md @@ -48,8 +48,8 @@ The code must pass `flake8` (available from pip or conda) or it will fail the build. Check for style errors before submitting your pull request with: ``` -flake8 pyarrow -flake8 --config=.flake8.cython pyarrow +flake8 . +flake8 --config=.flake8.cython . ``` ### Building from Source