Skip to content

Commit

Permalink
[FLINK-32975][state] Enhance equals() for all MapState's iterator (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
jiexray authored and masteryhx committed Aug 31, 2023
1 parent 360b97a commit aa8d93e
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -3591,6 +3592,40 @@ public void testMapStateIteratorArbitraryAccess() throws Exception {
}
}

/** Verify that iterator of {@link MapState} compares on the content. */
@Test
public void testMapStateEntryCompare() throws Exception {
MapStateDescriptor<Integer, Long> stateDesc1 =
new MapStateDescriptor<>("map-state-1", Integer.class, Long.class);
MapStateDescriptor<Integer, Long> stateDesc2 =
new MapStateDescriptor<>("map-state-2", Integer.class, Long.class);

CheckpointableKeyedStateBackend<Integer> backend =
createKeyedBackend(IntSerializer.INSTANCE);

try {
MapState<Integer, Long> state1 =
backend.getPartitionedState(
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDesc1);
MapState<Integer, Long> state2 =
backend.getPartitionedState(
VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDesc2);

Map.Entry<Integer, Long> expectedEntry = new AbstractMap.SimpleEntry<>(0, 10L);
backend.setCurrentKey(1);
state1.put(expectedEntry.getKey(), expectedEntry.getValue());
state2.put(expectedEntry.getKey(), expectedEntry.getValue());

assertEquals(state1.entries().iterator().next(), expectedEntry);
assertEquals(state2.entries().iterator().next(), expectedEntry);

assertEquals(state1.entries().iterator().next(), state2.entries().iterator().next());
} finally {
IOUtils.closeQuietly(backend);
backend.dispose();
}
}

/** Verify that {@link ValueStateDescriptor} allows {@code null} as default. */
@Test
public void testValueStateNullAsDefaultValue() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;

/**
* Delegated partitioned {@link MapState} that forwards changes to {@link StateChange} upon {@link
Expand Down Expand Up @@ -83,6 +84,16 @@ public UV setValue(UV value) {
}
return oldValue;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof Map.Entry)) {
return false;
}
Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
return Objects.equals(entry.getKey(), e.getKey())
&& Objects.equals(entry.getValue(), e.getValue());
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkArgument;

Expand Down Expand Up @@ -537,6 +538,15 @@ public UV setValue(UV value) {

return oldValue;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof Map.Entry)) {
return false;
}
Map.Entry<?, ?> e = (Map.Entry<?, ?>) o;
return Objects.equals(getKey(), e.getKey()) && Objects.equals(getValue(), e.getValue());
}
}

/** An auxiliary utility to scan all entries under the given key. */
Expand Down

0 comments on commit aa8d93e

Please sign in to comment.