Skip to content

Commit

Permalink
[Improve][E2E] Improve Druid E2E Case (#8077)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 authored Nov 18, 2024
1 parent 81cf10e commit 3608ef8
Showing 1 changed file with 51 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@
import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.awaitility.Awaitility.given;

@Slf4j
@DisabledOnContainer(
value = {TestContainerId.SPARK_2_4},
Expand Down Expand Up @@ -94,27 +97,28 @@ public void tearDown() throws Exception {
public void testDruidSink(TestContainer container) throws Exception {
Container.ExecResult execResult = container.executeJob("/fakesource_to_druid.conf");
Assertions.assertEquals(0, execResult.getExitCode());
while (true) {
String responseBody = getSelectResponse(DATASOURCE);
String expectedDataRow1 =
"\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3";
String expectedDataRow2 =
"\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999";
String expectedDataRow3 =
"\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489";
String expectedDataRow4 =
"\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012";

if (!responseBody.contains("errorMessage")) {
// Check sink data
Assertions.assertEquals(responseBody.contains(expectedDataRow1), true);
Assertions.assertEquals(responseBody.contains(expectedDataRow2), true);
Assertions.assertEquals(responseBody.contains(expectedDataRow3), true);
Assertions.assertEquals(responseBody.contains(expectedDataRow4), true);
break;
}
Thread.sleep(1000);
}

given().ignoreExceptions()
.await()
.atLeast(100L, TimeUnit.MILLISECONDS)
.atMost(400L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
String responseBody = getSelectResponse(DATASOURCE);
String expectedDataRow1 =
"\"c_boolean\":\"true\",\"c_timestamp\":\"2020-02-02T02:02:02\",\"c_string\":\"NEW\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":3,\"c_bigint\":4,\"c_float\":4.3,\"c_double\":5.3,\"c_decimal\":6.3";
String expectedDataRow2 =
"\"c_boolean\":\"false\",\"c_timestamp\":\"2012-12-21T12:34:56\",\"c_string\":\"AAA\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":333,\"c_bigint\":323232,\"c_float\":3.1,\"c_double\":9.33333,\"c_decimal\":99999.99999999";
String expectedDataRow3 =
"\"c_boolean\":\"true\",\"c_timestamp\":\"2016-03-12T11:29:33\",\"c_string\":\"BBB\",\"c_tinyint\":1,\"c_smallint\":2,\"c_int\":672,\"c_bigint\":546782,\"c_float\":7.9,\"c_double\":6.88888,\"c_decimal\":88888.45623489";
String expectedDataRow4 =
"\"c_boolean\":\"false\",\"c_timestamp\":\"2014-04-28T09:13:27\",\"c_string\":\"CCC\",\"c_tinyint\":1,\"c_smallint\":1,\"c_int\":271,\"c_bigint\":683221,\"c_float\":4.8,\"c_double\":4.45271,\"c_decimal\":79277.68219012";
Assertions.assertFalse(responseBody.contains("errorMessage"));
Assertions.assertTrue(responseBody.contains(expectedDataRow1));
Assertions.assertTrue(responseBody.contains(expectedDataRow2));
Assertions.assertTrue(responseBody.contains(expectedDataRow3));
Assertions.assertTrue(responseBody.contains(expectedDataRow4));
});
}

@DisabledOnContainer(
Expand All @@ -127,28 +131,32 @@ public void testDruidMultiSink(TestContainer container) throws Exception {
container.executeJob("/fakesource_to_druid_with_multi.conf");
Assertions.assertEquals(0, execResult.getExitCode());
// Check multi sink table 1
while (true) {
String responseBody = getSelectResponse(MULTI_DATASOURCE_1);
String expectedDataRow =
"\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3,\"val_string\":\"NEW\"";

if (!responseBody.contains("errorMessage")) {
Assertions.assertEquals(responseBody.contains(expectedDataRow), true);
break;
}
Thread.sleep(1000);
}
given().ignoreExceptions()
.await()
.atLeast(100L, TimeUnit.MILLISECONDS)
.atMost(400L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
String responseBody = getSelectResponse(MULTI_DATASOURCE_1);
String expectedDataRow =
"\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3,\"val_string\":\"NEW\"";
Assertions.assertFalse(responseBody.contains("errorMessage"));
Assertions.assertTrue(responseBody.contains(expectedDataRow));
});

// Check multi sink table 2
while (true) {
String responseBody = getSelectResponse(MULTI_DATASOURCE_2);
String expectedDataRow =
"\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3";
if (!responseBody.contains("errorMessage")) {
Assertions.assertEquals(responseBody.contains(expectedDataRow), true);
break;
}
Thread.sleep(1000);
}
given().ignoreExceptions()
.await()
.atLeast(100L, TimeUnit.MILLISECONDS)
.atMost(400L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
String responseBody = getSelectResponse(MULTI_DATASOURCE_2);
String expectedDataRow =
"\"id\":1,\"val_bool\":\"true\",\"val_tinyint\":1,\"val_smallint\":2,\"val_int\":3,\"val_bigint\":4,\"val_float\":4.3,\"val_double\":5.3,\"val_decimal\":6.3";
Assertions.assertFalse(responseBody.contains("errorMessage"));
Assertions.assertTrue(responseBody.contains(expectedDataRow));
});
}

private void changeCoordinatorURLConf(String resourceFilePath) throws UnknownHostException {
Expand Down Expand Up @@ -184,8 +192,7 @@ private String getSelectResponse(String datasource) throws IOException {
entity.setContentType("application/json");
request.setEntity(entity);
HttpResponse response = client.execute(request);
String responseBody = EntityUtils.toString(response.getEntity());
return responseBody;
return EntityUtils.toString(response.getEntity());
}
}
}

0 comments on commit 3608ef8

Please sign in to comment.