Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [Feat] Iceberg writer #113

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1eaff9c
Fixed merge conflict
shubham19may Feb 24, 2025
6300803
Refactored code a bit to make it more clean
shubham19may Feb 26, 2025
c175ebe
Fixed merge conflict
shubham19may Feb 24, 2025
32861cf
Refactored code a bit to make it more clean
shubham19may Feb 26, 2025
d2f532a
Made RPC server per stream and added batching logic to only push data…
shubham19may Feb 26, 2025
8f41dc5
Fixed merge conflicts
shubham19may Feb 26, 2025
00071bc
Increased the RPC deadline window
shubham19may Feb 26, 2025
a414e5d
Added the normalisation
shubham19may Feb 27, 2025
f3b580e
Iceberg-writer code integrated in Olake
shubham19may Mar 3, 2025
9e6f9d0
Updated the target path
shubham19may Mar 3, 2025
dd13ed9
Commiting legal notices
shubham19may Mar 3, 2025
8838a94
Merge conflict fix
shubham19may Mar 3, 2025
fc1b595
Fixed release file
shubham19may Mar 3, 2025
cd19a86
Upgraded java version in github ci
shubham19may Mar 3, 2025
1efd5a8
Fixing java build by downgrading java version
shubham19may Mar 4, 2025
fdda0d9
Temporary login for jar figuring out part
shubham19may Mar 4, 2025
cbb1254
Setup the java process logs for getting the iceberg logs in golang logs
shubham19may Mar 5, 2025
403202b
Added readme for the java-iceberg code
shubham19may Mar 6, 2025
b3e7fda
Removed retry logic for Iceberg and added a lock based commit
shubham19may Mar 6, 2025
1aba145
Added Iceberg writer readme for glue configuration
shubham19may Mar 6, 2025
20a231a
Removed debezium dependencies and classes/tests
shubham19may Mar 6, 2025
eeb7794
Merge branch 'master' into feat/iceberg-writer
shubham19may Mar 7, 2025
b9fa0b0
Added faster ingestion for Iceberg java code
shubham19may Mar 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/build-and-release-driver.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v3

- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
cache: maven

- name: Set Driver
run: echo "DRIVER=${{ env.DRIVER }}" >> $GITHUB_ENV
- name: Set VERSION
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@ __debug_bin*
go.sum
go.work.sum
**/examples
debezium-server-iceberg-sink-0.0.1-SNAPSHOT.jar
writers/iceberg/debezium-server-iceberg-sink/target/
writers/iceberg/debezium-server-iceberg-sink/target/**/
debezium-server-iceberg-sink.jar
local-releaser.sh
10 changes: 9 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,17 @@ RUN go build -o /olake main.go
# Final Runtime Stage
FROM alpine:3.18

# Install Java 17 instead of Java 11
RUN apk add --no-cache openjdk17

# Copy the binary from the build stage
COPY --from=base /olake /home/olake

# Copy the pre-built JAR file from Maven
# First try to copy from the source location (works after Maven build)
COPY writers/iceberg/debezium-server-iceberg-sink/target/debezium-server-iceberg-sink-0.0.1-SNAPSHOT.jar /home/debezium-server-iceberg-sink.jar


ARG DRIVER_VERSION=dev
ARG DRIVER_NAME=olake
# Metadata
Expand All @@ -25,4 +33,4 @@ LABEL io.eggwhite.name=olake/source-${DRIVER_NAME}
WORKDIR /home

# Entrypoint
ENTRYPOINT ["./olake"]
ENTRYPOINT ["./olake"]
84 changes: 84 additions & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,61 @@ function fail() {

joined_arguments=""

# Function to check and build the Java JAR file for iceberg if needed
function check_and_build_jar() {
local connector="$1"

# Only do this for iceberg destination
if [[ "$connector" != "iceberg" ]]; then
return 0
fi

echo "============================== Checking for Iceberg JAR file =============================="

# Check if the JAR exists in the base directory
if [ -f "debezium-server-iceberg-sink.jar" ]; then
echo "JAR file found in base directory."
return 0
fi

# Check in the target directory
if [ -f "writers/iceberg/debezium-server-iceberg-sink/target/debezium-server-iceberg-sink-0.0.1-SNAPSHOT.jar" ]; then
echo "JAR file found in target directory, copying to base directory..."
cp writers/iceberg/debezium-server-iceberg-sink/target/debezium-server-iceberg-sink-0.0.1-SNAPSHOT.jar ./debezium-server-iceberg-sink.jar
return 0
fi

# If JAR not found, build it
echo "Iceberg JAR file not found. Building with Maven..."

# Store current directory
local current_dir=$(pwd)

# Navigate to the Maven project directory
if [ -d "writers/iceberg/debezium-server-iceberg-sink" ]; then
cd writers/iceberg/debezium-server-iceberg-sink
elif [ -d "writers/iceberg" ]; then
cd writers/iceberg
else
fail "Cannot find Iceberg Maven project directory."
fi

# Build with Maven
mvn clean package -Dmaven.test.skip=true || fail "Maven build failed"

# Return to original directory
cd "$current_dir"

# Copy the JAR file to the base directory
if [ -f "writers/iceberg/debezium-server-iceberg-sink/target/debezium-server-iceberg-sink-0.0.1-SNAPSHOT.jar" ]; then
cp writers/iceberg/debezium-server-iceberg-sink/target/debezium-server-iceberg-sink-0.0.1-SNAPSHOT.jar ./debezium-server-iceberg-sink.jar
else
fail "Maven build completed but could not find the JAR file."
fi

echo "============================== JAR file built and copied to base directory =============================="
}

function build_and_run() {
local connector="$1"
if [[ $2 == "driver" ]]; then
Expand All @@ -15,6 +70,35 @@ function build_and_run() {
else
fail "The argument does not have a recognized prefix."
fi

# Check if writer.json is specified in the arguments
local writer_file=""
local using_iceberg=false

# Parse the arguments to find the writer.json file path
local previous_arg=""
for arg in $joined_arguments; do
if [[ "$previous_arg" == "--destination" || "$previous_arg" == "-d" ]]; then
writer_file="$arg"
break
fi
previous_arg="$arg"
done

# If writer file was found, check if it contains iceberg
if [[ -n "$writer_file" && -f "$writer_file" ]]; then
echo "Checking writer file: $writer_file for iceberg destination..."
if grep -qi "iceberg" "$writer_file"; then
echo "Iceberg destination detected in writer file."
using_iceberg=true
fi
fi

# If using iceberg, check and potentially build the JAR
if [[ "$using_iceberg" == true ]]; then
check_and_build_jar "iceberg"
fi

cd $path || fail "Failed to navigate to path: $path"
go mod tidy
go build -ldflags="-w -s -X constants/constants.version=${GIT_VERSION} -X constants/constants.commitsha=${GIT_COMMITSHA} -X constants/constants.releasechannel=${RELEASE_CHANNEL}" -o olake main.go || fail "build failed"
Expand Down
1 change: 1 addition & 0 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/datazip-inc/olake/logger"
protocol "github.com/datazip-inc/olake/protocol"
"github.com/datazip-inc/olake/safego"
_ "github.com/datazip-inc/olake/writers/iceberg" // registering local parquet writer
_ "github.com/datazip-inc/olake/writers/parquet" // registering local parquet writer
)

Expand Down
3 changes: 2 additions & 1 deletion drivers/mongodb/internal/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
)

func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) error {
stream.Self().BackfillInProcess = true
collection := m.client.Database(stream.Namespace(), options.Database().SetReadConcern(readconcern.Majority())).Collection(stream.Name())
chunks := stream.GetStateChunks()
backfillCtx := context.TODO()
Expand Down Expand Up @@ -99,7 +100,7 @@ func (m *Mongo) backfill(stream protocol.Stream, pool *protocol.WriterPool) erro
}

handleObjectID(doc)
exit, err := insert.Insert(types.CreateRawRecord(utils.GetKeysHash(doc, constants.MongoPrimaryID), doc, 0))
exit, err := insert.Insert(types.CreateRawRecord(utils.GetKeysHash(doc, constants.MongoPrimaryID), doc, 0, "r", time.Unix(0, 0).UnixNano()))
if err != nil {
return fmt.Errorf("failed to finish backfill chunk: %s", err)
}
Expand Down
27 changes: 21 additions & 6 deletions drivers/mongodb/internal/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ import (
"github.com/datazip-inc/olake/types"
"github.com/datazip-inc/olake/utils"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
)

type CDCDocument struct {
OperationType string `json:"operationType"`
FullDocument map[string]any `json:"fullDocument"`
OperationType string `json:"operationType"`
FullDocument map[string]any `json:"fullDocument"`
ClusterTime primitive.Timestamp `json:"clusterTime"`
}

func (m *Mongo) RunChangeStream(pool *protocol.WriterPool, streams ...protocol.Stream) error {
Expand Down Expand Up @@ -91,12 +93,25 @@ func (m *Mongo) changeStreamSync(stream protocol.Stream, pool *protocol.WriterPo
if err := cursor.Decode(&record); err != nil {
return fmt.Errorf("error while decoding: %s", err)
}
handleObjectID(record.FullDocument)

// TODO: Handle Deleted documents (Good First Issue)
if record.FullDocument != nil {
record.FullDocument["cdc_type"] = record.OperationType
// Map MongoDB operation types to Debezium format
opType := "c" // default to create
switch record.OperationType {
case "update":
opType = "u"
case "delete":
opType = "d"
}
handleObjectID(record.FullDocument)
rawRecord := types.CreateRawRecord(utils.GetKeysHash(record.FullDocument, constants.MongoPrimaryID), record.FullDocument, 0)

rawRecord := types.CreateRawRecord(
utils.GetKeysHash(record.FullDocument, constants.MongoPrimaryID),
record.FullDocument,
0,
opType,
int64(record.ClusterTime.T)*1000,
)
exit, err := insert.Insert(rawRecord)
if err != nil {
return err
Expand Down
11 changes: 10 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ require (
github.com/stretchr/testify v1.9.0
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20241021075129-b732d2ac9c9b
go.mongodb.org/mongo-driver v1.17.2
google.golang.org/grpc v1.69.0
google.golang.org/protobuf v1.36.4
sigs.k8s.io/yaml v1.3.0
)

Expand Down Expand Up @@ -44,13 +47,14 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.3.0 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

Expand All @@ -69,3 +73,8 @@ require (
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace (
google.golang.org/genproto => google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80
google.golang.org/genproto/googleapis/rpc => google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80
)
78 changes: 78 additions & 0 deletions logger/logger.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package logger

import (
"bufio"
"context"
"encoding/json"
"fmt"
Expand All @@ -10,6 +11,7 @@
"path/filepath"
"runtime"
"strings"
"sync"
"time"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -222,3 +224,79 @@

logger = zerolog.New(multiwriter).With().Timestamp().Logger()
}

// ProcessOutputReader is a struct that manages reading output from a process
// and forwarding it to the logger
type ProcessOutputReader struct {
Name string // Name to identify the process in logs
IsError bool // Whether this reader handles error output
reader *bufio.Scanner
closeFn func() error
closeOnce sync.Once
}

// NewProcessOutputReader creates a new ProcessOutputReader for a given process
// name is a prefix to identify the process in logs
// isError determines whether to log as Error (true) or Info (false)
// returns the reader and a write end that should be connected to the process output
func NewProcessOutputReader(name string, isError bool) (*ProcessOutputReader, *os.File, error) {
r, w, err := os.Pipe()
if err != nil {
return nil, nil, fmt.Errorf("failed to create pipe: %v", err)
}

reader := &ProcessOutputReader{
Name: name,
IsError: isError,
reader: bufio.NewScanner(r),
closeFn: r.Close,
}

return reader, w, nil
}

// StartReading starts reading from the process output in a goroutine
// and logging each line with the appropriate log level
func (p *ProcessOutputReader) StartReading() {
go func() {
defer p.Close()
for p.reader.Scan() {
if p.IsError {
Error(fmt.Sprintf("[%s] %s", p.Name, p.reader.Text()))
} else {
Info(fmt.Sprintf("[%s] %s", p.Name, p.reader.Text()))
}
}
}()
}

// Close closes the reader
func (p *ProcessOutputReader) Close() {
p.closeOnce.Do(func() {
if p.closeFn != nil {
p.closeFn()

Check failure on line 277 in logger/logger.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Error return value is not checked (errcheck)
}
})
}

// SetupProcessOutputCapture creates stdout and stderr readers for a process
// and returns write-ends that should be connected to the process stdout and stderr
func SetupProcessOutputCapture(processName string) (*ProcessOutputReader, *ProcessOutputReader, *os.File, *os.File, error) {
// Setup stdout reader
stdoutReader, stdoutWriter, err := NewProcessOutputReader(processName, false)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to create stdout reader: %v", err)
}

// Setup stderr reader
stderrReader, stderrWriter, err := NewProcessOutputReader(processName, true)
if err != nil {
stdoutReader.Close()
if stdoutWriter != nil {
stdoutWriter.Close()
}
return nil, nil, nil, nil, fmt.Errorf("failed to create stderr reader: %v", err)
}

return stdoutReader, stderrReader, stdoutWriter, stderrWriter, nil
}
32 changes: 30 additions & 2 deletions release-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,31 @@ function chalk() {
elif [[ $color == "green" ]]; then
color_code=2
fi
echo -e "$(tput setaf $color_code)${text}$(tput sgr0)"
# Check if TERM is set before using tput
if [[ -n "$TERM" ]]; then
echo -e "$(tput setaf $color_code)${text}$(tput sgr0)"
else
# Fallback if TERM is not set
if [[ $color == "red" ]]; then
echo -e "\033[31m${text}\033[0m"
elif [[ $color == "green" ]]; then
echo -e "\033[32m${text}\033[0m"
else
echo -e "${text}"
fi
fi
}

# Function to build the Java project with Maven
function build_java_project() {
echo "Building Java project with Maven..."
# Change to the directory containing the POM file
cd writers/iceberg/debezium-server-iceberg-sink || fail "Failed to change to Maven project directory"
echo "Building Maven project in $(pwd)"
mvn clean package -Dmaven.test.skip=true || fail "Maven build failed"
# Return to the original directory
cd - || fail "Failed to return to original directory"
echo "$(chalk green "✅ Java project successfully built")"
}

# Function to fail with a message
Expand Down Expand Up @@ -113,4 +137,8 @@ chalk green "=== Release version: $VERSION ==="
connector=$DRIVER
type="source"

release "$VERSION" "$platform" "$CURRENT_BRANCH"

# Build Java project
build_java_project

release "$VERSION" "$platform" "$CURRENT_BRANCH"
Loading
Loading