Skip to content

Commit

Permalink
[FLINK-35865][base] Support Byte and Short in ObjectUtils (#3481)
Browse files Browse the repository at this point in the history
  • Loading branch information
GOODBOY008 authored Jul 23, 2024
1 parent fcb4cd8 commit c5f391c
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public static void addJar(StreamExecutionEnvironment env, Collection<URL> jarUrl
Stream.concat(previousJars.stream(), jarUrls.stream().map(URL::toString))
.distinct()
.collect(Collectors.toList());
LOG.info("pipeline.jars is " + String.join(",", currentJars));
LOG.info("pipeline.jars is {}", String.join(",", currentJars));
configuration.set(PipelineOptions.JARS, currentJars);
} catch (Exception e) {
throw new RuntimeException("Failed to add JAR to Flink execution environment", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,19 @@ public class ObjectUtils {
* will throw {@link ArithmeticException} if number overflows.
*/
public static Object plus(Object number, int augend) throws ArithmeticException {
if (number instanceof Integer) {
if (number instanceof Byte) {
int result = Math.addExact((Byte) number, augend);
if (result < Byte.MIN_VALUE || result > Byte.MAX_VALUE) {
throw new ArithmeticException("byte overflow");
}
return (byte) result;
} else if (number instanceof Short) {
int result = Math.addExact((Short) number, augend);
if (result < Short.MIN_VALUE || result > Short.MAX_VALUE) {
throw new ArithmeticException("short overflow");
}
return (short) result;
} else if (number instanceof Integer) {
return Math.addExact((Integer) number, augend);
} else if (number instanceof Long) {
return Math.addExact((Long) number, augend);
Expand All @@ -53,7 +65,13 @@ public static BigDecimal minus(Object minuend, Object subtrahend) {
minuend.getClass().getSimpleName(),
subtrahend.getClass().getSimpleName()));
}
if (minuend instanceof Integer) {
if (minuend instanceof Byte) {
return BigDecimal.valueOf((byte) minuend)
.subtract(BigDecimal.valueOf((byte) subtrahend));
} else if (minuend instanceof Short) {
return BigDecimal.valueOf((short) minuend)
.subtract(BigDecimal.valueOf((short) subtrahend));
} else if (minuend instanceof Integer) {
return BigDecimal.valueOf((int) minuend).subtract(BigDecimal.valueOf((int) subtrahend));
} else if (minuend instanceof Long) {
return BigDecimal.valueOf((long) minuend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -49,7 +50,6 @@
import java.util.Map;
import java.util.Objects;

import static java.math.BigDecimal.ROUND_CEILING;
import static org.apache.flink.cdc.connectors.base.utils.ObjectUtils.doubleCompare;

/**
Expand Down Expand Up @@ -351,7 +351,9 @@ private double calculateDistributionFactor(
// factor = (max - min + 1) / rowCount
final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
double distributionFactor =
subRowCnt.divide(new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue();
subRowCnt
.divide(new BigDecimal(approximateRowCnt), 4, RoundingMode.CEILING)
.doubleValue();
LOG.info(
"The distribution factor of table {} is {} according to the min split key {}, max split key {} and approximate row count {}",
tableId,
Expand Down

0 comments on commit c5f391c

Please sign in to comment.