Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrading dependencies #151

Closed
wants to merge 21 commits into from
35 changes: 30 additions & 5 deletions .github/workflows/ci-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,46 @@ jobs:
psutil
""" > requirements.txt
- name: Set up Python
uses: actions/setup-python@v3
uses: MatteoH2O1999/setup-python@v1
with:
python-version: '2.x'
python-version: '2.7'
cache: 'pip'
allow-build: info
cache-build: true
- name: Setup CCM private
if: ${{ matrix.cassandra.is_dse }}
env:
CLONE_RIPTANO_CCM_SECRET: ${{secrets.CLONE_RIPTANO_CCM_SECRET}}
TEST: ${{secrets.TEST}}
GB_TOKEN: ${{secrets.GB_TOKEN}}
run: |
#!/bin/bash
set -e
git clone https://${CLONE_RIPTANO_CCM_SECRET}@github.com/riptano/ccm-private.git
if [ "GB_TOKEN" == "" ]; then
echo "GB_TOKEN=NO"
else
echo "GB_TOKEN=YES"
fi
echo $TEST
if [ "$TEST" == "" ]; then
echo "envsecret=NO"
else
echo "envsecret=YES"
fi
if [ "$CLONE_RIPTANO_CCM_SECRET" == "" ]; then
echo "secretspresent=NO"
else
echo "secretspresent=YES"
fi
echo "Cloning the repo"
git clone https://${GB_TOKEN}:[email protected]/riptano/ccm-private.git
cd ccm-private
echo "pip install requirements"
pip install -r requirements.txt
echo "setup.py install"
./setup.py install
- name: Setup CCM
if: ${{ !matrix.cassandra.is_dse }}
Expand Down Expand Up @@ -117,7 +142,7 @@ jobs:
export CCM_CASSANDRA_VERSION=${{ matrix.cassandra.ccm_version }}
export CCM_VERSION=${{ matrix.cassandra.ccm_version }}
mvn verify -Pmedium -pl pulsar-impl \
mvn verify -Pmedium -pl sink \
-Ddsbulk.ccm.CCM_VERSION=$CCM_VERSION \
-Ddsbulk.ccm.CCM_IS_DSE=$CCM_IS_DSE \
-Ddsbulk.ccm.JAVA_HOME="$JDK8_PATH" \
Expand Down Expand Up @@ -146,7 +171,7 @@ jobs:
env
mvn verify -Pmedium -pl tests \
mvn verify -Pmedium -pl sink \
-Ddsbulk.ccm.CCM_VERSION=$CCM_VERSION \
-Ddsbulk.ccm.CCM_IS_DSE=$CCM_IS_DSE \
-Ddsbulk.ccm.JAVA_HOME="$JDK8_PATH" \
Expand Down
10 changes: 5 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
<messaging.connectors.commons.version>1.0.15</messaging.connectors.commons.version>
<java.version>1.8</java.version>
<java.release.version>8</java.release.version>
<kafka.connect.version>2.4.0</kafka.connect.version>
<confluent.version>5.2.1</confluent.version>
<kafka.connect.version>3.7.2</kafka.connect.version>
<confluent.version>7.7.2</confluent.version>
<caffeine.version>2.6.2</caffeine.version>
<oss.driver.version>4.16.0</oss.driver.version>
<dsbulk.version>1.10.0</dsbulk.version>
<reactive-streams.version>1.0.3</reactive-streams.version>
<reactive-streams.version>1.0.4</reactive-streams.version>
<guava.version>25.1-jre</guava.version>
<slf4j.version>1.7.25</slf4j.version>
<logback.version>1.2.3</logback.version>
Expand All @@ -61,8 +61,8 @@
<max.ccm.clusters>2</max.ccm.clusters>
<commons-exec.version>1.3</commons-exec.version>
<awaitility.version>4.0.1</awaitility.version>
<netty.version>4.1.39.Final</netty.version>
<netty.tcnative.version>2.0.25.Final</netty.tcnative.version>
<netty.version>4.1.77.Final</netty.version>
<netty.tcnative.version>2.0.52.Final</netty.tcnative.version>
<metrics.version>4.0.2</metrics.version>
<lz4.version>1.6.0</lz4.version>
<snappy.version>1.1.7.2</snappy.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
Expand All @@ -59,6 +60,8 @@
@ExtendWith(WiremockResolver.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@Tag("medium")
// per https://github.com/datastax/dsbulk/commit/0c91fdc4aa3e867decf4f080353b3bba468ce911
@Disabled(value = "SNI Proxy image not available anymore in CI")
public class CloudSniEndToEndIT extends ITConnectorBase {

private final SNIProxyServer proxy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
@ExtendWith(StreamInterceptingExtension.class)
@ExtendWith(LogInterceptingExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SimulacronConfig(dseVersion = "5.0.8")
@SimulacronConfig
class SimpleEndToEndSimulacronIT {

private static final String INSERT_STATEMENT =
Expand Down Expand Up @@ -332,14 +332,7 @@ void fail_prepare_counter_table() {

ImmutableMap<String, String> props =
ImmutableMap.<String, String>builder()
.put("name", INSTANCE_NAME)
.put("contactPoints", connectorProperties.get("contactPoints"))
.put("port", connectorProperties.get("port"))
.put("loadBalancing.localDc", "dc1")
// since we upgraded to Driver 4.16.x, we need to explicitly set the protocol version
// otherwise it will try only DSE_v1 and DSE_v2 because they are not considered "BETA"
// https://github.com/datastax/java-driver/blob/4270f93277249abb513bc2abf2ff7a7c481b1d0d/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java#L163
.put("datastax-java-driver.advanced.protocol.version", "V4")
.putAll(connectorProperties)
.put("topic.mytopic.ks1.mycounter.mapping", "a=key, b=value, c=value.f2")
.build();
assertThatThrownBy(() -> task.start(props))
Expand Down Expand Up @@ -367,14 +360,7 @@ void fail_delete() {
.build()));
simulacron.prime(when(bad1).then(serverError("bad thing")));
Map<String, String> connProps = new HashMap<>();
connProps.put("name", INSTANCE_NAME);
connProps.put("contactPoints", hostname);
// since we upgraded to Driver 4.16.x, we need to explicitly set the protocol version
// otherwise it will try only DSE_v1 and DSE_v2 because they are not considered "BETA"
// https://github.com/datastax/java-driver/blob/4270f93277249abb513bc2abf2ff7a7c481b1d0d/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java#L163
connProps.put("datastax-java-driver.advanced.protocol.version", "V4");
connProps.put("port", port);
connProps.put("loadBalancing.localDc", "dc1");
connProps.putAll(connectorProperties);
connProps.put(
"topic.mytopic.ks1.mycounter.mapping", "a=value.bigint, b=value.text, c=value.int");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,24 @@
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.confluent.connect.avro.AvroConverter;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.AvroSchemaUtils;
import java.io.ByteArrayInputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

/**
* Tests to validate discussion in KAF-91. These tests are intended to prove that the behaviour
Expand Down Expand Up @@ -161,7 +154,7 @@ public void should_keep_big_decimal_with_json_converter_decimal_format_numeric()
public void should_convert_big_decimal_to_bytes_with_avro_converter() throws Exception {

String topic = "topic";
AvroConverter converter = new AvroConverter(Mockito.mock(SchemaRegistryClient.class));
AvroConverter converter = new AvroConverter(new MockSchemaRegistryClient());
converter.configure(
Collections.singletonMap(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost"),
Expand All @@ -172,29 +165,15 @@ public void should_convert_big_decimal_to_bytes_with_avro_converter() throws Exc
new SchemaBuilder(Schema.Type.BYTES)
.name(Decimal.LOGICAL_NAME)
.parameter(Decimal.SCALE_FIELD, Integer.toString(expected.scale()))
.required()
.build();

// Root conversion operation
byte[] convertedBytes = converter.fromConnectData(topic, schema, expected);

// AvroConverter winds up adding 5 extra bytes, a "magic" byte + a 4 byte ID value, so strip
// those here. See AbstractKafkaAvroSerializer for more detail.
ByteArrayInputStream stream =
new ByteArrayInputStream(convertedBytes, 5, convertedBytes.length - 5);

org.apache.avro.Schema bytesSchema = AvroSchemaUtils.getSchema(convertedBytes);

// Confirm that we can read the contents of the connect data as a byte array (not by itself an
// impressive feat) _and_ that the bytes in this array represent the unscaled value of the
// expected BigInteger
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
DatumReader<Object> reader = new GenericDatumReader(bytesSchema);
ByteBuffer observedBytes = (ByteBuffer) reader.read(null, decoder);

BigDecimal observed =
new BigDecimal(
new BigInteger(observedBytes.array()),
Integer.parseInt(schema.parameters().get(Decimal.SCALE_FIELD)));
assertThat(expected).isEqualTo(observed);
SchemaAndValue val = converter.toConnectData(topic, convertedBytes);
assertThat(expected).isEqualTo(val.value());
}
}
Loading