Skip to content

Commit

Permalink
Instant conversion
Browse files Browse the repository at this point in the history
Signed-off-by: Santhosh Gandhe <[email protected]>
  • Loading branch information
san81 committed Oct 25, 2024
1 parent ec98d4c commit 38a6890
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private void createPartition(List<ItemInfo> itemInfoList, EnhancedSourceCoordina
SaasWorkerProgressState state = new SaasWorkerProgressState();
state.setKeyAttributes(itemInfo.getKeyAttributes());
state.setItemIds(itemIds);
state.setExportStartTime(System.currentTimeMillis());
state.setExportStartTime(Instant.now());
state.setLoadedItems(itemInfoList.size());
SaasSourcePartition sourcePartition = new SaasSourcePartition(state, partitionKey);
coordinator.createPartition(sourcePartition);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.dataprepper.plugins.source.source_crawler.util.CustomInstantDeserializer;

import java.io.IOException;
import java.time.Instant;

@Setter
Expand All @@ -19,18 +16,11 @@ public class LeaderProgressState {
private boolean initialized = false;

@JsonProperty("last_poll_time")
@JsonDeserialize(using = InstantDeserializer.class)
@JsonDeserialize(using = CustomInstantDeserializer.class)
private Instant lastPollTime;

public LeaderProgressState(@JsonProperty("last_poll_time") final Instant lastPollTime) {
this.lastPollTime = lastPollTime;
}
}

class InstantDeserializer extends JsonDeserializer<Instant> {
@Override
public Instant deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
long millis = p.getLongValue();
return Instant.ofEpochMilli(millis);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import lombok.Getter;
import lombok.Setter;
import org.opensearch.dataprepper.plugins.source.source_crawler.util.CustomInstantDeserializer;

import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -24,7 +27,8 @@ public class SaasWorkerProgressState {
private int loadedItems;

@JsonProperty("exportStartTime")
private long exportStartTime;
@JsonDeserialize(using = CustomInstantDeserializer.class)
private Instant exportStartTime;

private Map<String, String> keyAttributes = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.util;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;

import java.io.IOException;
import java.time.Instant;

public class CustomInstantDeserializer extends JsonDeserializer<Instant> {
@Override
public Instant deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
long millis = p.getLongValue();
return Instant.ofEpochMilli(millis);
}
}
Original file line number Diff line number Diff line change
@@ -1,34 +1,44 @@
package org.opensearch.dataprepper.plugins.source.source_crawler.coordination.state;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.junit.jupiter.api.Test;

import java.time.Instant;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class SaasWorkerProgressStateTest {

private static final ObjectMapper objectMapper = new ObjectMapper();
private static final ObjectMapper objectMapper = new ObjectMapper(new JsonFactory())
.registerModule(new JavaTimeModule());

@Test
void testDefaultValues() throws JsonProcessingException {
String state = "{}";
SaasWorkerProgressState workderProgressState = objectMapper.readValue(state, SaasWorkerProgressState.class);
assert workderProgressState.getTotalItems() == 0;
assert workderProgressState.getLoadedItems() == 0;
assert workderProgressState.getKeyAttributes() != null;
assert workderProgressState.getKeyAttributes().isEmpty();
assert workderProgressState.getExportStartTime() == 0;
assert workderProgressState.getItemIds() == null;
assertEquals(0, workderProgressState.getTotalItems());
assertEquals(0, workderProgressState.getLoadedItems());
assertNotNull(workderProgressState.getKeyAttributes());
assertTrue(workderProgressState.getKeyAttributes().isEmpty());
assertNull(workderProgressState.getExportStartTime());
assertNull(workderProgressState.getItemIds());
}

@Test
void testInitializedValues() throws JsonProcessingException {
String state = "{\"keyAttributes\":{\"project\":\"project-1\"},\"totalItems\":10,\"loadedItems\":20,\"exportStartTime\":1729391235717,\"itemIds\":[\"GTMS-25\",\"GTMS-24\"]}";
SaasWorkerProgressState workderProgressState = objectMapper.readValue(state, SaasWorkerProgressState.class);
assert workderProgressState.getTotalItems() == 10;
assert workderProgressState.getLoadedItems() == 20;
assert workderProgressState.getKeyAttributes() != null;
assert workderProgressState.getExportStartTime() == 1729391235717L;
assert workderProgressState.getItemIds() != null;
assert workderProgressState.getItemIds().size() == 2;
assertEquals(10, workderProgressState.getTotalItems());
assertEquals(20, workderProgressState.getLoadedItems());
assertNotNull(workderProgressState.getKeyAttributes());
assertEquals(workderProgressState.getExportStartTime(), Instant.ofEpochMilli(1729391235717L));
assertNotNull(workderProgressState.getItemIds());
assertEquals(2,workderProgressState.getItemIds().size());
}
}

0 comments on commit 38a6890

Please sign in to comment.