Skip to content

Commit

Permalink
Rebase CASL-321 (#371)
Browse files Browse the repository at this point in the history
* Fixed array patching (#365)

* Fixed array patching
* corrected patcher test expectation

* Fix thread stuck issue + addtnl logs (#368)

* Added logs to troubleshoot thread hanging issue
* CASL-258 unreleased lock fix (#358)
* Fixed array patching (#365) (#367)
* corrected patcher test expectation
* updated version
* updated changelog
---------

Co-authored-by: Pawel Mazurek <[email protected]>

---------

Co-authored-by: Hiren Patel <[email protected]>
Co-authored-by: Pawel Mazurek <[email protected]>
  • Loading branch information
3 people authored Oct 25, 2024
1 parent acc5770 commit b07480f
Show file tree
Hide file tree
Showing 21 changed files with 487 additions and 25 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## Naksha_2.1.5

- Fixed thread hanging issue fixed by avoiding indefinite locking in `FibLinearProbeTable.java`

## Naksha_2.1.4

- Fixes:
- Patch API (POST /features) fixed to `replace` entire array instead of `append` nodes during patch operation, which otherwise prevents removal of the node even-though is desired


## Naksha_1.1.1

- Fixes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import static com.here.naksha.app.service.http.apis.ApiParams.*;
import static com.here.naksha.common.http.apis.ApiParamsConst.*;
import static com.here.naksha.lib.core.util.diff.PatcherUtils.removeAllRemoveOp;
import static com.here.naksha.lib.core.util.diff.PatcherUtils.removeAllRemoveOpExceptForList;
import static com.here.naksha.lib.core.util.storage.ResultHelper.readFeaturesFromResult;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -389,7 +389,7 @@ private List<XyzFeature> performInMemoryPatching(
if (inputFeature.getId().equals(storageFeature.getId())) {
// we found matching feature in storage, so we take patched version of the feature
final Difference difference = Patcher.getDifference(storageFeature, inputFeature);
final Difference diffNoRemoveOp = removeAllRemoveOp(difference);
final Difference diffNoRemoveOp = removeAllRemoveOpExceptForList(difference);
featureToPatch = Patcher.patch(storageFeature, diffNoRemoveOp);
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ servers:
- url: "https://naksha-v2.ext.mapcreator.here.com/"
description: "PRD"
info:
title: "Naskha Hub-API"
title: "Naksha Hub-API"
description: "Naksha Hub-API is a REST API to provide simple access to geo data."
version: "2.1.5"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,9 @@ private void incInstanceLevelUsage(String actorId, long limit) {
break;
}
// Failed, conflict, repeat
log.info(
"Concurrency conflict while incrementing instance level threadCount from {}. Will retry...",
threadCount);
}
}

Expand Down Expand Up @@ -649,6 +652,9 @@ private void incActorLevelUsage(String actorId, long limit) {
if (counter == null) {
Long existing = actorUsageMap.putIfAbsent(actorId, 1L);
if (existing != null) {
log.info(
"Concurrency conflict while initializing threadCount to 1 for actorId [{}]. Will retry...",
actorId);
continue; // Repeat, conflict with other thread
}
return;
Expand All @@ -669,6 +675,10 @@ private void incActorLevelUsage(String actorId, long limit) {
break;
}
// Failed, conflict, repeat
log.info(
"Concurrency conflict while incrementing actor level threadCount from {} for actorId [{}]. Will retry...",
counter,
actorId);
}
}

Expand All @@ -693,13 +703,20 @@ private void decActorLevelUsage(String actorId) {
log.error("Invalid actor usage value for actor: " + actorId + " value: " + current);
}
if (!actorUsageMap.remove(actorId, current)) {
log.info(
"Concurrency conflict while removing actor level threadCount for actorId [{}]. Will retry...",
actorId);
continue;
}
break;
} else if (actorUsageMap.replace(actorId, current, current - 1)) {
break;
}
// Failed, repeat, conflict with other thread
log.info(
"Concurrency conflict while decrementing actor level threadCount from {} for actorId [{}]. Will retry...",
current,
actorId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public long startNanos() {
return newValue;
}
// Conflict, two threads seem to want to update the same key the same time!
logger.info("Concurrency conflict while updating attachment map for key {}", valueClass);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,17 @@
import org.jetbrains.annotations.ApiStatus.AvailableSince;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Storage API to gain access to storages.
*/
@AvailableSince(NakshaVersion.v2_0_6)
public interface IStorage extends AutoCloseable {

Logger logger = LoggerFactory.getLogger(IStorage.class);

/**
* Initializes the storage, create the transaction table, install needed scripts and extensions.
*
Expand Down Expand Up @@ -191,10 +195,12 @@ default void close() {
try {
shutdown(null).get();
return;
} catch (InterruptedException ignore) {
} catch (InterruptedException ie) {
logger.warn("Exception while shutting down IStorage. ", ie);
} catch (Exception e) {
throw unchecked(e);
}
logger.info("Unable to shutdown IStorage. Will retry...");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.List;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Functional implementation of a recursive, thread safe, growing map, based upon <a
Expand Down Expand Up @@ -76,6 +78,8 @@ public final class FibMap {
// See: https://developer.arm.com/documentation/100941/0101/Barriers
//

private static final Logger log = LoggerFactory.getLogger(FibMap.class);

/** The empty array used by default, so that empty maps do not consume memory. */
public static final Object[] EMPTY = new Object[0];

Expand Down Expand Up @@ -343,6 +347,7 @@ public static boolean containsKey(@NotNull Object searchKey, Object @NotNull []

if (existing_key == LOCK_OBJECT) {
// Thread.yield();
log.info("Concurrency conflict while modifying a key at {}. Will retry...", i);
continue;
}

Expand All @@ -359,6 +364,7 @@ public static boolean containsKey(@NotNull Object searchKey, Object @NotNull []

if (sub_key == LOCK_OBJECT) {
// Thread.yield();
log.info("Concurrency conflict while modifying a sub_key at {}. Will retry...", j);
continue;
}
if (ILike.equals(sub_key, key)) {
Expand Down Expand Up @@ -560,6 +566,7 @@ public static long count(@Nullable Object @NotNull [] array) {

if (existing_key == LOCK_OBJECT) {
// Thread.yield();
log.info("Concurrency conflict while modifying a key at {}. Will retry...", ki);
continue;
}

Expand Down Expand Up @@ -630,6 +637,7 @@ public static long count(@Nullable Object @NotNull [] array) {
}
}
// Conflict: concurrent update.
log.info("Concurrency conflict while locking key at {}. Will retry...", ki);
continue;
}
return _put(key, key_hash, expected_value, new_value, create, segment, depth + 1, intern, conflict);
Expand All @@ -656,6 +664,7 @@ public static long count(@Nullable Object @NotNull [] array) {
}
// Race condition, another thread modifies concurrently.
// Thread.yield();
log.info("Concurrency conflict while locking key at {}. Will retry...", ki);
continue;
}

Expand Down Expand Up @@ -693,6 +702,7 @@ public static long count(@Nullable Object @NotNull [] array) {
return UNDEFINED;
}
// Race condition, another thread modifies concurrently.
log.info("Concurrency conflict while locking key at {}. Will retry...", ki);
continue;
}
// We need to create a new sub-array that must be initialized with the existing key, so we
Expand All @@ -710,6 +720,7 @@ public static long count(@Nullable Object @NotNull [] array) {
return _put(key, key_hash, expected_value, new_value, true, sub_array, depth + 1, intern, conflict);
}
// Race condition, another thread modifies concurrently.
log.info("Concurrency conflict while locking key at {}. Will retry...", ki);
continue;
}

Expand All @@ -735,6 +746,7 @@ public static long count(@Nullable Object @NotNull [] array) {
return UNDEFINED;
}
// Race condition, another thread modified concurrently.
log.info("Race condition while modifying key. Will retry...");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,33 @@
import java.util.Map.Entry;

public class PatcherUtils {
public static Difference removeAllRemoveOp(Difference difference) {
public static Difference removeAllRemoveOpExceptForList(Difference difference) {
if (difference instanceof RemoveOp) {
return null;
} else if (difference instanceof ListDiff) {
final ListDiff listdiff = (ListDiff) difference;
final Iterator<Difference> iterator = listdiff.iterator();
while (iterator.hasNext()) {
Difference next = iterator.next();
if (next == null) continue;
next = removeAllRemoveOp(next);
if (next == null) iterator.remove();
}
return listdiff;
} else if (difference instanceof MapDiff) {
final MapDiff mapdiff = (MapDiff) difference;
final Iterator<Entry<Object, Difference>> iterator =
mapdiff.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Object, Difference> next = iterator.next();
next.setValue(removeAllRemoveOp(next.getValue()));
next.setValue(removeAllRemoveOpExceptForList(next.getValue()));
if (next.getValue() == null) iterator.remove();
}
return mapdiff;
}
// NOTE - we avoid removal of nodes for ListDiff, to ensure List is always replaced during patch and not
// appended
/*else if (difference instanceof ListDiff) {
final ListDiff listdiff = (ListDiff) difference;
final Iterator<Difference> iterator = listdiff.iterator();
while (iterator.hasNext()) {
Difference next = iterator.next();
if (next == null) continue;
next = removeAllRemoveOpExceptForList(next);
if (next == null) iterator.remove();
}
return listdiff;
}*/
return difference;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ ENTRY execute(final @NotNull FibSetOp op, final @NotNull KEY key, final @NotNull
if (raw instanceof Reference<?>) {
Reference<?> ref = (Reference<?>) raw;
entry = (ENTRY) ref.get();
if (entry == null && lock.tryLock()) {
if (entry == null && (lock.isHeldByCurrentThread() || lock.tryLock())) {
locked = true;
array[i] = null;
SIZE.getAndAdd(fibSet, -1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.jetbrains.annotations.ApiStatus.AvailableSince;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A recursive, thread safe, weak/soft/strong referencing set, based upon <a
Expand All @@ -54,6 +56,8 @@
@SuppressWarnings({"rawtypes", "unused"})
public class FibSet<KEY, ENTRY extends FibEntry<KEY>> {

private static final Logger logger = LoggerFactory.getLogger(FibSet.class);

/**
* The empty array used by default, so that empty maps do not consume memory.
*/
Expand Down Expand Up @@ -452,6 +456,9 @@ ENTRY _execute(
if (raw_entry == null) {
if (!ARRAY.compareAndSet(array, index, ref, null)) {
// Race condition, another thread modified the array slot.
logger.info(
"Concurrency conflict while initializing array value at index {}. Will retry...",
index);
continue;
}
SIZE.getAndAdd(this, -1L);
Expand Down Expand Up @@ -487,6 +494,7 @@ ENTRY _execute(
return (ENTRY) entry;
}
// Race condition, other thread updated the reference concurrently.
logger.info("Concurrency conflict while setting array value at index {}. Will retry...", index);
continue;
}
assert op == REMOVE;
Expand All @@ -495,6 +503,7 @@ ENTRY _execute(
return (ENTRY) entry;
}
// Race condition, other thread updated the reference concurrently.
logger.info("Concurrency conflict while nullifying array value at index {}. Will retry...", index);
continue;
}

Expand All @@ -516,6 +525,7 @@ ENTRY _execute(
return _execute(op, key, key_hash, refType, sub_array, depth + 1);
}
// Race condition, another thread modified concurrently.
logger.info("Concurrency conflict while setting array value at index {}. Will retry...", index);
continue;
}

Expand All @@ -529,6 +539,7 @@ ENTRY _execute(
return new_entry;
}
// Race condition, another thread modified concurrently.
logger.info("Concurrency conflict while setting array value at index {}. Will retry...", index);
continue;
}

Expand All @@ -546,6 +557,7 @@ ENTRY _execute(
return new_entry;
}
// Race condition, another thread modified concurrently.
logger.info("Concurrency conflict while initializing array value at index {}. Will retry...", index);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
import java.nio.ByteOrder;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class JsonFieldBool<OBJECT> extends JsonField<OBJECT, Boolean> {

private static final Logger logger = LoggerFactory.getLogger(JsonFieldBool.class);

JsonFieldBool(
@NotNull JsonClass<OBJECT> jsonClass,
@NotNull Field javaField,
Expand Down Expand Up @@ -94,6 +98,7 @@ public boolean _compareAndSwap(@NotNull OBJECT object, Boolean expected, Boolean
return true;
}
// We need to loop, because possibly some code modified bytes we're not interested in.
logger.info("Concurrency conflict while setting value at offset {}. Will retry...", offset);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
import java.nio.ByteOrder;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class JsonFieldByte<OBJECT> extends JsonField<OBJECT, Byte> {

private static final Logger logger = LoggerFactory.getLogger(JsonFieldByte.class);

JsonFieldByte(
@NotNull JsonClass<OBJECT> jsonClass,
@NotNull Field javaField,
Expand Down Expand Up @@ -100,6 +104,7 @@ public boolean _compareAndSwap(@NotNull OBJECT object, Byte expected, Byte value
return true;
}
// We need to loop, because possibly some code modified bytes we're not interested in.
logger.info("Concurrency conflict while setting value at offset {}. Will retry...", offset);
}
}

Expand Down
Loading

0 comments on commit b07480f

Please sign in to comment.