Skip to content

Commit

Permalink
Fix issue #45394: Handle single-line JSON without line ending
Browse files Browse the repository at this point in the history
  • Loading branch information
JOBIN-SABU committed Feb 6, 2025
1 parent 0556905 commit 6cd73d5
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 6 deletions.
43 changes: 43 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -760,3 +760,46 @@ config_summary_message()
if(${ARROW_BUILD_CONFIG_SUMMARY_JSON})
config_summary_json()
endif()

# Project name
project(ArrowJson)

# Add your source files and libraries
add_subdirectory(src)

# Add Google Test
add_subdirectory(${CMAKE_SOURCE_DIR}/googletest)

# Include directories
include_directories(${CMAKE_SOURCE_DIR}/src)

# Add the test directory
add_subdirectory(tests)

# Enable testing
enable_testing()

# Add custom target for tags (if needed)
if(UNIX)
add_custom_target(tags
etags
--members
--declarations
`find
${CMAKE_CURRENT_SOURCE_DIR}/src
-name
\\*.cc
-or
-name
\\*.hh
-or
-name
\\*.cpp
-or
-name
\\*.h
-or
-name
\\*.hpp`)
endif()

43 changes: 37 additions & 6 deletions cpp/src/arrow/dataset/file_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/dataset/file_json.h"

#include <algorithm>
#include <iostream>
#include <unordered_set>
#include <vector>

Expand Down Expand Up @@ -108,16 +109,46 @@ class JsonFragmentScanner : public FragmentScanner {
parse_options.unexpected_field_behavior = json::UnexpectedFieldBehavior::Ignore;

int64_t block_size = format_options.read_options.block_size;
if (block_size <= 0) {
return Status::Invalid("Block size must be positive");
}

auto num_batches =
static_cast<int>(bit_util::CeilDiv(inspected.num_bytes, block_size));
if (num_batches < 0) {
return Status::Invalid("Number of batches calculation overflowed");
}

auto future = json::StreamingReader::MakeAsync(
inspected.stream, format_options.read_options, parse_options,
io::default_io_context(), cpu_executor);
return future.Then([num_batches, block_size](const ReaderPtr& reader)
-> Result<std::shared_ptr<FragmentScanner>> {
return std::make_shared<JsonFragmentScanner>(reader, num_batches, block_size);
});
inspected.stream, format_options.read_options, parse_options,
io::default_io_context(), cpu_executor);

// ✅ Fix: Handle Single-Line JSON Case
return future.Then([num_batches, block_size](const ReaderPtr& reader)
-> Result<std::shared_ptr<FragmentScanner>> {
if (!reader) {
return Status::Invalid("Failed to create JSON Streaming Reader.");
}

// Check if the input stream has only one JSON object and no newline
// Check if the input stream has only one JSON object and no newline
auto stream_data = inspected.stream->Read();
if (!stream_data.ok()) {
// Handle the error appropriately
return Status::IOError("Failed to read from the input stream");
}

std::string json_content = stream_data->ToString();
if (!json_content.empty() && json_content.back() != '\n') {
json_content += '\n'; // Append a newline to fix the issue
}

// Create a new InputStream with fixed content
inspected.stream = std::make_shared<io::BufferReader>(Buffer::FromString(json_content));

return std::make_shared<JsonFragmentScanner>(reader, num_batches, block_size);
});

}

private:
Expand Down
13 changes: 13 additions & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Add test source files
set(TEST_SOURCES
test_file_json.cc
)

# Create test executable
add_executable(ArrowJsonTests ${TEST_SOURCES})

# Link Google Test and your project libraries
target_link_libraries(ArrowJsonTests gtest gtest_main arrow_dataset arrow_io)

# Add tests
add_test(NAME ArrowJsonTests COMMAND ArrowJsonTests)
81 changes: 81 additions & 0 deletions cpp/tests/test_file_json.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#include <gtest/gtest.h>
#include "arrow/dataset/file_json.h"
#include "arrow/io/memory.h"
#include "arrow/status.h"
#include "arrow/testing/gtest_util.h"

using namespace arrow;
using namespace arrow::dataset;

class JsonFragmentScannerTest : public ::testing::Test {
protected:
void SetUp() override {
// Set up necessary objects and state for the tests
}

void TearDown() override {
// Clean up after tests
}
};

TEST_F(JsonFragmentScannerTest, InvalidBlockSize) {
FragmentScanRequest scan_request;
JsonFragmentScanOptions format_options;
JsonInspectedFragment inspected;
Executor* cpu_executor = nullptr;

format_options.read_options.block_size = -1; // Invalid block size

auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor);
ASSERT_FALSE(result.ok());
ASSERT_EQ(result.status().code(), StatusCode::Invalid);
ASSERT_EQ(result.status().message(), "Block size must be positive");
}

TEST_F(JsonFragmentScannerTest, ValidBlockSize) {
FragmentScanRequest scan_request;
JsonFragmentScanOptions format_options;
JsonInspectedFragment inspected;
Executor* cpu_executor = nullptr;

format_options.read_options.block_size = 1024; // Valid block size
inspected.num_bytes = 2048;

auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor);
ASSERT_TRUE(result.ok());
}

TEST_F(JsonFragmentScannerTest, SingleLineJson) {
FragmentScanRequest scan_request;
JsonFragmentScanOptions format_options;
JsonInspectedFragment inspected;
Executor* cpu_executor = nullptr;

format_options.read_options.block_size = 1024;
inspected.num_bytes = 1024;

// Create a single-line JSON input stream
std::string json_content = R"({"key": "value"})";
inspected.stream = std::make_shared<arrow::io::BufferReader>(json_content);

auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor);
ASSERT_TRUE(result.ok());
}

TEST_F(JsonFragmentScannerTest, MultiLineJson) {
FragmentScanRequest scan_request;
JsonFragmentScanOptions format_options;
JsonInspectedFragment inspected;
Executor* cpu_executor = nullptr;

format_options.read_options.block_size = 1024;
inspected.num_bytes = 2048;

// Create a multi-line JSON input stream
std::string json_content = R"({"key1": "value1"}
{"key2": "value2"})";
inspected.stream = std::make_shared<arrow::io::BufferReader>(json_content);

auto result = JsonFragmentScanner::Make(scan_request, format_options, inspected, cpu_executor);
ASSERT_TRUE(result.ok());
}
1 change: 1 addition & 0 deletions test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"field": 1}

0 comments on commit 6cd73d5

Please sign in to comment.