Skip to content

Commit

Permalink
fixing
Browse files Browse the repository at this point in the history
Committed-by: xiaolei.zl from Dev container
  • Loading branch information
zhanglei1949 committed Oct 25, 2024
1 parent b66fab4 commit 91fcabe
Show file tree
Hide file tree
Showing 30 changed files with 268 additions and 324 deletions.
22 changes: 22 additions & 0 deletions docs/flex/interactive/development/dev_and_test.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ mvn clean package -DskipTests -Pexperimental
- `USE_PTHASH`: Indicates whether to use a perfect hash when building the vertex map.
- `OPTIMIZE_FOR_HOST`: Determines if Flex should be optimized for performance on the current machine. Note that enabling this option may result in a binary that does not run on different platforms or CPU architectures.

### Wal Writer

Interactive use WAL(Write Ahead Logging) to ensure the data integrity. Two different wal writer is provided with respect to different storage interface: `LocalWalWriter` for writing wals to local disk and `KafkaWalWriter` for persisting wals on kafka.

You could switch the wal writer type in the configuration. See [Configuration](./../configuration.md#service-configuration).

#### Local Wal Writer

The default wal writer, you don't need to do anything, just make sure your disk has enough space

#### Kafka Wal Writer

You need to deploy a kafka cluster first. For details, please refer to [Kafka Documentation](https://kafka.apache.org/documentation/). Follow the [QuickStart](https://kafka.apache.org/quickstart) to deploy a service.

#### Performance

##### Settings

##### Producing Wals

##### Consuming Wals

## Testing

Numerous test cases have been created for Interactive, which can be referenced in the GitHub workflow[interactive.yaml](https://github.com/alibaba/GraphScope/blob/main/.github/workflows/interactive.yml).
Expand Down
18 changes: 9 additions & 9 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" OFF)
option(USE_PTHASH "Whether to use pthash" OFF)
option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host
option(USE_STATIC_ARROW "Whether to use static arrow" ON) # Whether to link arrow statically, default is ON
option(ENABLE_KAFKA "Whether to enable kafka" ON)
option(BUILD_KAFKA_WAL_WRITER "Whether to build kafka wal writer" ON)

#print options
message(STATUS "Build test: ${BUILD_TEST}")
Expand Down Expand Up @@ -58,11 +58,10 @@ if(USE_PTHASH)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/third_party/murmurhash)
endif()

if (ENABLE_KAFKA)
if (BUILD_KAFKA_WAL_WRITER)
find_package(CppKafka)
if (CppKafka_FOUND)
message(STATUS "cppkafka found, build without third_party/cppkafka")
else ()
if (NOT CppKafka_FOUND)
message(STATUS "cppkafka not found, try to build with third_party/cppkafka")
add_subdirectory(third_party/cppkafka)
# if cppkafka/CMakeLists.txt not exits, tell user to run git submodule update --init --recursive
if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/CMakeLists.txt)
Expand All @@ -71,11 +70,12 @@ if (ENABLE_KAFKA)
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/include)
set (CppKafka_INSTALL_DIR ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/build)
list (APPEND CMAKE_PREFIX_PATH ${CppKafka_INSTALL_DIR})
# find_package (CppKafka CONFIG REQUIRED)
# alias target
add_library(CppKafka::cppkafka ALIAS cppkafka)
set(CppKafka_LIBRARIES cppkafka)
else()
include_directories(SYSTEM ${CppKafka_INCLUDE_DIRS})
set(CppKafka_LIBRARIES CppKafka::cppkafka)
endif ()
add_definitions(-DENABLE_KAFKA)
add_definitions(-DBUILD_KAFKA_WAL_WRITER)
endif()

set(DEFAULT_BUILD_TYPE "Release")
Expand Down
7 changes: 5 additions & 2 deletions flex/bin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,8 @@ add_executable(stored_procedure_runner stored_procedure_runner.cc)
target_link_libraries(stored_procedure_runner flex_rt_mutable_graph flex_utils flex_graph_db ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${Boost_LIBRARIES})
install_without_export_flex_target(stored_procedure_runner)

add_executable(wal_consumer wal_consumer.cc)
target_link_libraries(wal_consumer PUBLIC CppKafka::cppkafka ${Boost_LIBRARIES} ${GLOG_LIBRARIES} flex_graph_db)
if (BUILD_KAFKA_WAL_WRITER)
add_executable(wal_consumer wal_consumer.cc)
target_link_libraries(wal_consumer PUBLIC ${CppKafka_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} flex_graph_db)
install_without_export_flex_target(wal_consumer)
endif()
44 changes: 20 additions & 24 deletions flex/bin/wal_consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* limitations under the License.
*/

#ifdef BUILD_KAFKA_WAL_WRITER

#include <csignal>
#include <filesystem>
#include <iostream>
Expand All @@ -34,12 +36,11 @@ class WalSender {
static constexpr int32_t CONNECTION_TIMEOUT = 10;
static constexpr int32_t READ_TIMEOUT = 60;
static constexpr int32_t WRITE_TIMEOUT = 60;
WalSender(const std::string& host, int port, const std::string& graph_id)
: host_(host), port_(port), client_(host_, port_), graph_id_(graph_id) {
WalSender(const std::string& host, int port, const std::string& dst_url)
: host_(host), port_(port), client_(host_, port_), req_url_(dst_url) {
client_.set_connection_timeout(CONNECTION_TIMEOUT);
client_.set_read_timeout(READ_TIMEOUT);
client_.set_write_timeout(WRITE_TIMEOUT);
req_url_ = "/v1/graph/" + graph_id_ + "/wal";
}

void send(const std::string& payload) {
Expand All @@ -57,7 +58,6 @@ class WalSender {
std::string host_;
int port_;
httplib::Client client_;
std::string graph_id_;
std::string req_url_;
};

Expand All @@ -68,23 +68,25 @@ namespace bpo = boost::program_options;
int main(int argc, char** argv) {
std::string brokers;
std::string topic_name;
std::string graph_id;
std::string group_id;
std::string engine_url;
std::string engine_host;
std::string req_url;
int32_t engine_port;

bpo::options_description desc("Usage:");
desc.add_options()("help", "Display help message")(
"kafka-brokers,b", bpo::value<std::string>(&brokers)->required(),
"Kafka brokers list")(
"graph-id,i", bpo::value<std::string>(&graph_id)->required(), "graph_id")(
"url,u", bpo::value<std::string>(&req_url)->required(), "req_url")(
"group-id,g",
bpo::value<std::string>(&group_id)->default_value("interactive_group"),
"Kafka group id")("engine-url,u",
bpo::value<std::string>(&engine_url)->required(),
"Kafka group id")("engine-host,h",
bpo::value<std::string>(&engine_host)->required(),
"Engine URL")(
"engine-port,p", bpo::value<int32_t>(&engine_port)->required(),
"Engine port");
"Engine port")("topic,t",
bpo::value<std::string>(&topic_name)->required(),
"Kafka topic name");

google::InitGoogleLogging(argv[0]);
FLAGS_logtostderr = true;
Expand All @@ -110,14 +112,11 @@ int main(int argc, char** argv) {
{"group.id", group_id},
// Disable auto commit
{"enable.auto.commit", false}};

topic_name = "graph_" + graph_id + "_wal";
// Create the consumer
gs::WalSender sender(vm["engine-url"].as<std::string>(),
vm["engine-port"].as<int32_t>(), graph_id);
gs::WalConsumer consumer(config, topic_name, 1);

// signal(SIGINT, [](int) { consumer.terminate(); });
vm["engine-port"].as<int32_t>(),
vm["req-url"].as<std::string>());
gs::KafkaWalConsumer consumer(config, topic_name, 1);

if (vm.count("help")) {
std::cout << desc << std::endl;
Expand All @@ -126,16 +125,13 @@ int main(int argc, char** argv) {

while (true) {
auto msg = consumer.poll();
if (msg.first == std::numeric_limits<uint32_t>::max()) {
continue;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
LOG(INFO) << "Received message: <" << msg.first << " -> " << msg.second
<< ">";
sender.send(msg.second);
LOG(INFO) << "Received message: -> " << msg << ">";
sender.send(msg);
}

LOG(INFO) << "Consuming messages from topic " << topic_name;

return 0;
}
}

#endif
4 changes: 2 additions & 2 deletions flex/engines/graph_db/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ target_include_directories(flex_graph_db PUBLIC $<BUILD_INTERFACE:${CMAKE_CURREN
target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(flex_graph_db flex_plan_proto runtime_adhoc)
install_flex_target(flex_graph_db)
if (ENABLE_KAFKA)
target_link_libraries(flex_graph_db CppKafka::cppkafka)
if (BUILD_KAFKA_WAL_WRITER)
target_link_libraries(flex_graph_db ${CppKafka_LIBRARIES})
endif()

install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/database/compact_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void CompactTransaction::Commit() {
header->timestamp = timestamp_;
header->type = 1;

logger_.append(timestamp_, arc_.GetBuffer(), arc_.GetSize());
logger_.append(arc_.GetBuffer(), arc_.GetSize());
arc_.Clear();

LOG(INFO) << "before compact - " << timestamp_;
Expand Down
28 changes: 16 additions & 12 deletions flex/engines/graph_db/database/graph_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ void GraphDB::GetAppInfo(Encoder& output) {

static void IngestWalRange(SessionLocalContext* contexts,
MutablePropertyFragment& graph,
const WalContentUnit* insert_wal_list, uint32_t from,
const IWalsParser& parser, uint32_t from,
uint32_t to, int thread_num) {
LOG(INFO) << "IngestWalRange, from " << from << ", to " << to;
std::atomic<uint32_t> cur_ts(from);
Expand All @@ -362,7 +362,7 @@ static void IngestWalRange(SessionLocalContext* contexts,
if (got_ts >= to) {
break;
}
const auto& unit = insert_wal_list[got_ts];
const auto& unit = parser.get_insert_wal(got_ts);
InsertTransaction::IngestWal(graph, got_ts, unit.ptr, unit.size,
alloc);
if (got_ts % 1000000 == 0) {
Expand All @@ -387,13 +387,12 @@ void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path,
for (const auto& entry : std::filesystem::directory_iterator(wal_dir_path)) {
wals.push_back(entry.path().string());
}
WalsParser parser(wals);
LocalWalsParser parser(wals);
uint32_t from_ts = 1;
for (auto& update_wal : parser.update_wals()) {
uint32_t to_ts = update_wal.timestamp;
if (from_ts < to_ts) {
IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts,
to_ts, thread_num);
IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num);
}
if (update_wal.size == 0) {
graph_.Compact(update_wal.timestamp);
Expand All @@ -405,26 +404,26 @@ void GraphDB::ingestWalsFromLocalFiles(const std::string& wal_dir_path,
from_ts = to_ts + 1;
}
if (from_ts <= parser.last_ts()) {
IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts,
parser.last_ts() + 1, thread_num);
IngestWalRange(contexts_, graph_, parser, from_ts, parser.last_ts() + 1,
thread_num);
}
version_manager_.init_ts(parser.last_ts(), thread_num);
}

#ifdef BUILD_KAFKA_WAL_WRITER
void GraphDB::ingestWalsFromKafka(const std::string& kafka_brokers,
const std::string& kafka_topic,
const std::string& work_dir, int thread_num) {
cppkafka::Configuration config = {{"metadata.broker.list", kafka_brokers},
{"group.id", "primary_group"},
// Disable auto commit
{"enable.auto.commit", false}};
KafkaWalParser parser(config, kafka_topic);
KafkaWalsParser parser(config, kafka_topic);
uint32_t from_ts = 1;
for (auto& update_wal : parser.update_wals()) {
uint32_t to_ts = update_wal.timestamp;
if (from_ts < to_ts) {
IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts,
to_ts, thread_num);
IngestWalRange(contexts_, graph_, parser, from_ts, to_ts, thread_num);
}
if (update_wal.size == 0) {
graph_.Compact(update_wal.timestamp);
Expand All @@ -436,12 +435,13 @@ void GraphDB::ingestWalsFromKafka(const std::string& kafka_brokers,
from_ts = to_ts + 1;
}
if (from_ts <= parser.last_ts()) {
IngestWalRange(contexts_, graph_, parser.insert_wal_list(), from_ts,
parser.last_ts() + 1, thread_num);
IngestWalRange(contexts_, graph_, parser, from_ts, parser.last_ts() + 1,
thread_num);
}

version_manager_.init_ts(parser.last_ts(), thread_num);
}
#endif

void GraphDB::initApps(
const std::unordered_map<std::string, std::pair<std::string, uint8_t>>&
Expand Down Expand Up @@ -508,6 +508,7 @@ void GraphDB::openWalAndCreateContexts(const std::string& data_dir,
contexts_[i].logger->open(wal_dir_path, i);
}
} else if (config.wal_writer_type == IWalWriter::WalWriterType::kKafka) {
#ifdef BUILD_KAFKA_WAL_WRITER
for (int i = 0; i < thread_num_; ++i) {
new (&contexts_[i]) SessionLocalContext(
*this, data_dir, i, allocator_strategy,
Expand All @@ -520,6 +521,9 @@ void GraphDB::openWalAndCreateContexts(const std::string& data_dir,
for (int i = 0; i < thread_num_; ++i) {
contexts_[i].logger->open(config.kafka_topic, i);
}
#else
LOG(FATAL) << "Kafka wal writer is not enabled in this build";
#endif
} else {
LOG(FATAL) << "Unsupported wal writer type: "
<< static_cast<int>(config.wal_writer_type);
Expand Down
2 changes: 2 additions & 0 deletions flex/engines/graph_db/database/graph_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,11 @@ class GraphDB {
void ingestWalsFromLocalFiles(const std::string& wal_dir,
const std::string& work_dir, int thread_num);

#ifdef BUILD_KAFKA_WAL_WRITER
void ingestWalsFromKafka(const std::string& kafka_brokers,
const std::string& kafka_topic,
const std::string& work_dir, int thread_num);
#endif

void initApps(
const std::unordered_map<std::string, std::pair<std::string, uint8_t>>&
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/database/graph_db_operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Result<std::string> GraphDBOperations::CreateVertex(
for (auto& edge_insert : input_json["edge_request"].GetArray()) {
edge_data.push_back(inputEdge(edge_insert, schema, session));
}
LOG(INFO) << "CreateVertex edge_data: " << edge_data.size();
} catch (std::exception& e) {
return Result<std::string>(
gs::Status(StatusCode::INVALID_SCHEMA,
Expand Down Expand Up @@ -452,7 +453,6 @@ Status GraphDBOperations::singleInsertVertex(
"Fail to create edge; All inserts are rollbacked");
}
}
// LOG(INFO) << "Commit singleInsertVertex";
txnWrite.Commit();
} catch (std::exception& e) {
return Status(StatusCode::INVALID_SCHEMA, e.what());
Expand Down
19 changes: 7 additions & 12 deletions flex/engines/graph_db/database/graph_db_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,17 +251,17 @@ AppBase* GraphDBSession::GetApp(int type) {

#undef likely // likely

Result<std::string> GraphDBSession::IngestWals(const std::string_view& input) {
if (input.size() < sizeof(WalHeader)) {
Result<std::string> GraphDBSession::IngestWals(const char* input, size_t len) {
if (len < sizeof(WalHeader)) {
LOG(ERROR) << "Invalid wal content";
return Status(StatusCode::INVALID_ARGUMENT, "Invalid wal content");
}
const WalHeader* header = reinterpret_cast<const WalHeader*>(input.data());
if (header->length + sizeof(WalHeader) != input.size()) {
const WalHeader* header = reinterpret_cast<const WalHeader*>(input);
if (header->length + sizeof(WalHeader) != len) {
LOG(ERROR) << "Invalid wal content";
return Status(StatusCode::INVALID_ARGUMENT, "Invalid wal content");
}
return deserialize_and_apply_wal(header, input.data(), header->length);
return deserialize_and_apply_wal(header, input, header->length);
}

// Quite similar to the InsertTransation::Commit();
Expand All @@ -278,7 +278,7 @@ Result<std::string> GraphDBSession::deserialize_and_apply_insert_wal(
LOG(INFO) << "Applying insert wal with timestamp: " << ts
<< ", length: " << length
<< ", logger type: " << static_cast<int>(logger_.type());
logger_.append(ts, data, length);
logger_.append(data, length);

InsertTransaction::IngestWal(db_.graph(), ts,
const_cast<char*>(data) + sizeof(WalHeader),
Expand All @@ -296,7 +296,7 @@ Result<std::string> GraphDBSession::deserialize_and_apply_update_wal(
db_.version_manager_.revert_update_timestamp(ts);
return Status(StatusCode::INVALID_ARGUMENT, "Invalid wal timestamp");
}
logger_.append(ts, data, length);
logger_.append(data, length);
UpdateTransaction::IngestWal(db_.graph(), work_dir_, ts,
const_cast<char*>(data), length, alloc_);
db_.version_manager_.release_update_timestamp(ts);
Expand All @@ -310,13 +310,8 @@ Result<std::string> GraphDBSession::deserialize_and_apply_wal(
return Status::OK();
}
if (header->type == 0) {
// insert_transaction
// single_edge_insert_transaction
// single_vertex_insert_transaction
return deserialize_and_apply_insert_wal(header, data, length);
} else if (header->type == 1) {
// compact_transaction
// update_transaction
return deserialize_and_apply_update_wal(header, data, length);
} else {
return Status(StatusCode::INVALID_ARGUMENT, "Invalid wal type");
Expand Down
Loading

0 comments on commit 91fcabe

Please sign in to comment.