Skip to content

Commit

Permalink
ARROW-3146: [C++] Prototype Flight RPC client and server implementations
Browse files Browse the repository at this point in the history
This is a partial C++ implementation of the Flight RPC system initially proposed by Jacques in ARROW-249. As in Java, I had to dig into gRPC and Protocol Buffers internals to ensure that

* On write, memory is only copied once into the outgoing gRPC buffer
* On read, no memory is copied

The way that I tricked gRPC into circumventing the built-in protobuf serde paths might look a bit hacky, but after digging around in the library a bunch I've convinced myself that it's the best and perhaps only way to accomplish this. Luckily, the message that's being serialized/deserialized is pretty opaque to the rest of the gRPC system, and it's controlled by the `SerializationTraits<T>` class. So you can take a gRPC stream reader and make it create any kind of type you want, even if the input data is a protocol buffer.

Some things that won't be addressed in this patch, as scope is too large:

* gRPC build toolchain issues (this is rather complex, I will create follow-up issues)
* Security / encryption, and authentication issues. I have only implemented an insecure server
* Integration with Travis CI
* Python bindings

API is preliminary and I expect to be the subject of iteration to make general and fast over the next several months.

Author: Wes McKinney <[email protected]>

Closes apache#2547 from wesm/flight-cpp-prototype and squashes the following commits:

64bcdea <Wes McKinney> Initial Arrow Flight C++ implementation
  • Loading branch information
wesm committed Sep 20, 2018
1 parent 35b5bce commit db0ef22
Show file tree
Hide file tree
Showing 44 changed files with 3,520 additions and 235 deletions.
4 changes: 4 additions & 0 deletions ci/travis_before_script_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ if [ $ARROW_TRAVIS_COVERAGE == "1" ]; then
CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_GENERATE_COVERAGE=ON"
fi

if [ $ARROW_TRAVIS_VERBOSE == "1" ]; then
CMAKE_COMMON_FLAGS="$CMAKE_COMMON_FLAGS -DARROW_VERBOSE_THIRDPARTY_BUILD=ON"
fi

if [ $TRAVIS_OS_NAME == "linux" ]; then
cmake $CMAKE_COMMON_FLAGS \
$CMAKE_LINUX_FLAGS \
Expand Down
2 changes: 1 addition & 1 deletion ci/travis_script_cpp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh

pushd $CPP_BUILD_DIR

ctest -j2 --output-on-failure -L unittest
PATH=$ARROW_BUILD_TYPE:$PATH ctest -j2 --output-on-failure -L unittest

popd

Expand Down
36 changes: 17 additions & 19 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ Pass multiple labels by dividing with semicolons")
"Compile with extra error context (line numbers, code)"
OFF)

option(ARROW_FLIGHT
"Build the Arrow Flight RPC System (requires GRPC, Protocol Buffers)"
OFF)

option(ARROW_IPC
"Build the Arrow IPC extensions"
ON)
Expand Down Expand Up @@ -240,10 +244,6 @@ Pass multiple labels by dividing with semicolons")
"Build with zstd compression"
ON)

option(ARROW_WITH_GRPC
"Build with GRPC"
OFF)

option(ARROW_GENERATE_COVERAGE
"Build with C++ code coverage enabled"
OFF)
Expand Down Expand Up @@ -634,14 +634,6 @@ if (ARROW_WITH_ZSTD)
SET(ARROW_STATIC_LINK_LIBS zstd_static ${ARROW_STATIC_LINK_LIBS})
endif()

if (ARROW_WITH_GRPC)
SET(ARROW_STATIC_LINK_LIBS
grpc_grp
grpc_grpc
grpc_grpcpp
${ARROW_STATIC_LINK_LIBS})
endif()

if (ARROW_ORC)
SET(ARROW_STATIC_LINK_LIBS
${ARROW_STATIC_LINK_LIBS}
Expand All @@ -657,11 +649,13 @@ if (ARROW_STATIC_LINK_LIBS)
add_dependencies(arrow_dependencies ${ARROW_STATIC_LINK_LIBS})
endif()

set(ARROW_BENCHMARK_LINK_LIBS
arrow_static
arrow_benchmark_main
gtest
${ARROW_STATIC_LINK_LIBS})
if (ARROW_BUILD_BENCHMARKS)
set(ARROW_BENCHMARK_LINK_LIBS
arrow_static
arrow_benchmark_main
gtest
${ARROW_STATIC_LINK_LIBS})
endif()

set(ARROW_SHARED_PRIVATE_LINK_LIBS
${ARROW_STATIC_LINK_LIBS}
Expand All @@ -683,8 +677,8 @@ endif()
set(ARROW_MIN_TEST_LIBS
arrow_static
${ARROW_STATIC_LINK_LIBS}
gtest
gtest_main)
gtest_main
gtest)

if(NOT MSVC)
set(ARROW_MIN_TEST_LIBS
Expand Down Expand Up @@ -743,6 +737,10 @@ endif()

add_subdirectory(src/arrow)

if(ARROW_FLIGHT)
add_subdirectory(src/arrow/flight)
endif()

if(ARROW_PYTHON)
add_subdirectory(src/arrow/python)
endif()
Expand Down
6 changes: 2 additions & 4 deletions cpp/build-support/lint_cpp_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ def lint_file(path):
'arrow/util/macros.h',
'arrow/python/iterators.h',
'arrow/util/parallel.h',
'arrow/io/hdfs-internal.h',
'parquet/arrow/test-util.h',
'parquet/encoding-internal.h',
'parquet/test-util.h'
'test',
'internal'
]

try:
Expand Down
1 change: 1 addition & 0 deletions cpp/cmake_modules/FindProtobuf.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ else()
endif()

mark_as_advanced (
PROTOBUF_EXECUTABLE
PROTOBUF_INCLUDE_DIR
PROTOBUF_LIBS
PROTOBUF_STATIC_LIB
Expand Down
116 changes: 67 additions & 49 deletions cpp/cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ if (NOT "$ENV{ARROW_BUILD_TOOLCHAIN}" STREQUAL "")
set(GTEST_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
endif()
set(JEMALLOC_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
set(GRPC_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
set(LZ4_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
# orc disabled as it's not in conda-forge (but in Anaconda with an incompatible ABI)
# set(ORC_HOME "$ENV{ARROW_BUILD_TOOLCHAIN}")
Expand Down Expand Up @@ -210,7 +211,7 @@ if (DEFINED ENV{ARROW_PROTOBUF_URL})
set(PROTOBUF_SOURCE_URL "$ENV{ARROW_PROTOBUF_URL}")
else()
string(SUBSTRING ${PROTOBUF_VERSION} 1 -1 STRIPPED_PROTOBUF_VERSION) # strip the leading `v`
set(PROTOBUF_SOURCE_URL "https://github.com/google/protobuf/releases/download/${PROTOBUF_VERSION}/protobuf-${STRIPPED_PROTOBUF_VERSION}.tar.gz")
set(PROTOBUF_SOURCE_URL "https://github.com/protocolbuffers/protobuf/releases/download/${PROTOBUF_VERSION}/protobuf-all-${STRIPPED_PROTOBUF_VERSION}.tar.gz")
endif()

set(RAPIDJSON_SOURCE_MD5 "badd12c511e081fec6c89c43a7027bce")
Expand Down Expand Up @@ -1009,54 +1010,11 @@ if (ARROW_WITH_ZSTD)
endif()
endif()

if (ARROW_WITH_GRPC)
# ----------------------------------------------------------------------
# GRPC
if ("${GRPC_HOME}" STREQUAL "")
set(GRPC_VENDORED 1)
set(GRPC_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep-prefix/src/grpc_ep-build")
set(GRPC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep/src/grpc_ep-install")
set(GRPC_HOME "${GRPC_PREFIX}")
set(GRPC_INCLUDE_DIR "${GRPC_PREFIX}/include")
set(GRPC_STATIC_LIBRARY_GPR "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}gpr${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(GRPC_STATIC_LIBRARY_GRPC "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpc${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(GRPC_STATIC_LIBRARY_GRPCPP "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpc++${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(GRPC_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
"-DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS}"
"-DCMAKE_C_FLAGS=${EP_C_FLAGS}"
-DCMAKE_INSTALL_PREFIX=${GRPC_PREFIX}
-DBUILD_SHARED_LIBS=OFF)

ExternalProject_Add(grpc_ep
GIT_REPOSITORY "https://github.com/grpc/grpc"
GIT_TAG ${GRPC_VERSION}
BUILD_BYPRODUCTS "${GRPC_STATIC_LIBRARY_GPR}" "${GRPC_STATIC_LIBRARY_GRPC}" "${GRPC_STATIC_LIBRARY_GRPCPP}"
${GRPC_BUILD_BYPRODUCTS}
${EP_LOG_OPTIONS}
CMAKE_ARGS ${GRPC_CMAKE_ARGS}
${EP_LOG_OPTIONS})
else()
find_package(gRPC CONFIG REQUIRED)
set(GRPC_VENDORED 0)
endif()

include_directories(SYSTEM ${GRPC_INCLUDE_DIR})
ADD_THIRDPARTY_LIB(grpc_grp
STATIC_LIB ${GRPC_STATIC_LIBRARY_GPR})
ADD_THIRDPARTY_LIB(grpc_grpc
STATIC_LIB ${GRPC_STATIC_LIBRARY_GRPC})
ADD_THIRDPARTY_LIB(grpc_grpcpp
STATIC_LIB ${GRPC_STATIC_LIBRARY_GRPCPP})

if (GRPC_VENDORED)
add_dependencies(grpc_grp grpc_ep)
add_dependencies(grpc_grpc grpc_ep)
add_dependencies(grpc_grpcpp grpc_ep)
endif()

endif()
# ----------------------------------------------------------------------
# Protocol Buffers (required for ORC and Flight libraries)

if (ARROW_ORC)
if (ARROW_ORC OR ARROW_FLIGHT)
# protobuf
if ("${PROTOBUF_HOME}" STREQUAL "")
set (PROTOBUF_PREFIX "${THIRDPARTY_DIR}/protobuf_ep-install")
Expand Down Expand Up @@ -1089,9 +1047,69 @@ if (ARROW_ORC)
if (PROTOBUF_VENDORED)
add_dependencies (protobuf protobuf_ep)
endif ()
endif()

# orc
# ----------------------------------------------------------------------
# Dependencies for Arrow Flight RPC

if (ARROW_FLIGHT)
if ("${GRPC_HOME}" STREQUAL "")
set(GRPC_VENDORED 1)
set(GRPC_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep-prefix/src/grpc_ep-build")
set(GRPC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/grpc_ep/src/grpc_ep-install")
set(GRPC_HOME "${GRPC_PREFIX}")
set(GRPC_INCLUDE_DIR "${GRPC_PREFIX}/include")
set(GRPC_STATIC_LIBRARY_GPR "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}gpr${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(GRPC_STATIC_LIBRARY_GRPC "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpc${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(GRPC_STATIC_LIBRARY_GRPCPP "${GRPC_BUILD_DIR}/${CMAKE_CFG_INTDIR}/${CMAKE_STATIC_LIBRARY_PREFIX}grpcpp${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(GRPC_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
"-DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS}"
"-DCMAKE_C_FLAGS=${EP_C_FLAGS}"
-DCMAKE_INSTALL_PREFIX=${GRPC_PREFIX}
-DBUILD_SHARED_LIBS=OFF)

ExternalProject_Add(grpc_ep
GIT_REPOSITORY "https://github.com/grpc/grpc"
GIT_TAG ${GRPC_VERSION}
BUILD_BYPRODUCTS "${GRPC_STATIC_LIBRARY_GPR}" "${GRPC_STATIC_LIBRARY_GRPC}" "${GRPC_STATIC_LIBRARY_GRPCPP}"
${GRPC_BUILD_BYPRODUCTS}
${EP_LOG_OPTIONS}
CMAKE_ARGS ${GRPC_CMAKE_ARGS}
${EP_LOG_OPTIONS})
include_directories(SYSTEM ${GRPC_INCLUDE_DIR})
else()
find_package(gRPC CONFIG REQUIRED)
set(GRPC_VENDORED 0)
endif()

get_property(GPR_STATIC_LIB TARGET gRPC::gpr PROPERTY LOCATION)
ADD_THIRDPARTY_LIB(grpc_gpr
STATIC_LIB ${GPR_STATIC_LIB})

get_property(GRPC_STATIC_LIB TARGET gRPC::grpc_unsecure PROPERTY LOCATION)
ADD_THIRDPARTY_LIB(grpc_grpc
STATIC_LIB ${GRPC_STATIC_LIB})

get_property(GRPCPP_STATIC_LIB TARGET gRPC::grpc++_unsecure PROPERTY LOCATION)
ADD_THIRDPARTY_LIB(grpc_grpcpp
STATIC_LIB ${GRPCPP_STATIC_LIB})

get_property(GRPC_ADDRESS_SORTING_STATIC_LIB
TARGET gRPC::address_sorting PROPERTY LOCATION)
ADD_THIRDPARTY_LIB(grpc_address_sorting
STATIC_LIB ${GRPC_ADDRESS_SORTING_STATIC_LIB})

# XXX(wesm): relying on vendored c-ares provided by gRPC for the time being
get_property(CARES_STATIC_LIB TARGET c-ares::cares_static PROPERTY LOCATION)
ADD_THIRDPARTY_LIB(cares
STATIC_LIB ${CARES_STATIC_LIB})
endif()

# ----------------------------------------------------------------------
# Apache ORC

if (ARROW_ORC)
# orc
if ("${ORC_HOME}" STREQUAL "")
set(ORC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/orc_ep-install")
set(ORC_HOME "${ORC_PREFIX}")
Expand Down Expand Up @@ -1274,10 +1292,10 @@ endif()

endif() # ARROW_HIVESERVER2

if (ARROW_USE_GLOG)
# ----------------------------------------------------------------------
# GLOG

if (ARROW_USE_GLOG)
if("${GLOG_HOME}" STREQUAL "")
set(GLOG_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/glog_ep-prefix/src/glog_ep")
set(GLOG_INCLUDE_DIR "${GLOG_BUILD_DIR}/include")
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ Status Buffer::FromString(const std::string& data, std::shared_ptr<Buffer>* out)
return FromString(data, default_memory_pool(), out);
}

std::string Buffer::ToString() const {
return std::string(reinterpret_cast<const char*>(data_), static_cast<size_t>(size_));
}

void Buffer::CheckMutable() const { DCHECK(is_mutable()) << "buffer not mutable"; }

/// A Buffer whose lifetime is tied to a particular MemoryPool
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,11 @@ class ARROW_EXPORT Buffer {
static_cast<int64_t>(sizeof(T) * data.size()));
} // namespace arrow

/// \brief Copy buffer contents into a new std::string
/// \return std::string
/// \note Can throw std::bad_alloc if buffer is large
std::string ToString() const;

int64_t capacity() const { return capacity_; }
const uint8_t* data() const { return data_; }

Expand Down
9 changes: 5 additions & 4 deletions cpp/src/arrow/dbi/hiveserver2/thrift/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ function(HS2_THRIFT_GEN VAR)
# All the output files we can determine based on filename.
# - Does not include .skeleton.cpp files
# - Does not include java output files
set(OUTPUT_BE_FILE "${GEN_DIR}/${FIL_WE}_types.cpp")
set(OUTPUT_BE_FILE ${OUTPUT_BE_FILE} " ${GEN_DIR}/${FIL_WE}_types.h")
set(OUTPUT_BE_FILE ${OUTPUT_BE_FILE} " ${GEN_DIR}/${FIL_WE}_constants.cpp")
set(OUTPUT_BE_FILE ${OUTPUT_BE_FILE} " ${GEN_DIR}/${FIL_WE}_constants.h")
set(OUTPUT_BE_FILE
"${GEN_DIR}/${FIL_WE}_types.cpp"
"${GEN_DIR}/${FIL_WE}_types.h"
"${GEN_DIR}/${FIL_WE}_constants.cpp"
"${GEN_DIR}/${FIL_WE}_constants.h")
list(APPEND ${VAR} ${OUTPUT_BE_FILE})

# BeeswaxService thrift generation
Expand Down
Loading

0 comments on commit db0ef22

Please sign in to comment.