Skip to content

Commit

Permalink
Merge branch 'master' into documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Dec 11, 2024
2 parents 8da8b55 + 7ac6204 commit 2bf4556
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 63 deletions.
2 changes: 1 addition & 1 deletion bin/container
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ fi
if [[ -n $build ]]; then
echo Cleaning old images...
images=$(docker images | fgrep $REPOSITORY | awk '{print $1":"$2}' | fgrep -v '<none>') || true
[[ -n $images ]] && docker rmi $images
[[ -n $images ]] && docker rmi $images || true

echo Building Dockerfile.$target
echo docker build -f Dockerfile.$target -t $target .
Expand Down
32 changes: 18 additions & 14 deletions udmif/api/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public class DynamicIotAccessProvider extends IotAccessBase {

private static final long INDEX_ORDERING_MULTIPLIER_MS = 10000L;
public static final String PROVIDER_KEY_FORMAT = "%s/%s";
private final Map<String, String> registryProviders = new ConcurrentHashMap<>();
private final List<String> providerList;
private final Map<String, IotAccessProvider> providers = new HashMap<>();
Expand All @@ -59,13 +60,16 @@ private String determineProvider(String registryId) {
}

private IotAccessProvider getProviderFor(Envelope envelope) {
return getProviderFor(envelope.deviceRegistryId);
return getProviderFor(envelope.deviceRegistryId, envelope.deviceId);
}

private IotAccessProvider getProviderFor(String registryId) {
String providerKey = registryProviders.computeIfAbsent(registryId, this::determineProvider);
private IotAccessProvider getProviderFor(String registryId, String deviceId) {
String entryKey = getProviderKey(registryId, deviceId);
String providerKey =
registryProviders.computeIfAbsent(entryKey, k -> determineProvider(registryId));
IotAccessProvider provider = getProviders().get(providerKey);
return requireNonNull(provider, "Could not determine provider for " + registryId);
return requireNonNull(provider,
format("Could not determine provider for %s from %s", providerKey, entryKey));
}

private Map<String, IotAccessProvider> getProviders() {
Expand Down Expand Up @@ -102,27 +106,27 @@ public void activate() {

@Override
public Entry<Long, String> fetchConfig(String registryId, String deviceId) {
return getProviderFor(registryId).fetchConfig(registryId, deviceId);
return getProviderFor(registryId, deviceId).fetchConfig(registryId, deviceId);
}

@Override
public CloudModel fetchDevice(String registryId, String deviceId) {
return getProviderFor(registryId).fetchDevice(registryId, deviceId);
return getProviderFor(registryId, deviceId).fetchDevice(registryId, deviceId);
}

@Override
public String fetchRegistryMetadata(String registryId, String metadataKey) {
return getProviderFor(registryId).fetchRegistryMetadata(registryId, metadataKey);
return getProviderFor(registryId, null).fetchRegistryMetadata(registryId, metadataKey);
}

@Override
public String fetchState(String registryId, String deviceId) {
return getProviderFor(registryId).fetchState(registryId, deviceId);
return getProviderFor(registryId, deviceId).fetchState(registryId, deviceId);
}

@Override
public void saveState(String registryId, String deviceId, String stateBlob) {
getProviderFor(registryId).saveState(registryId, deviceId, stateBlob);
getProviderFor(registryId, deviceId).saveState(registryId, deviceId, stateBlob);
}

@Override
Expand All @@ -143,19 +147,19 @@ public boolean isEnabled() {

@Override
public CloudModel listDevices(String registryId, Consumer<Integer> progress) {
return getProviderFor(registryId).listDevices(registryId, progress);
return getProviderFor(registryId, null).listDevices(registryId, progress);
}

@Override
public CloudModel modelDevice(String registryId, String deviceId, CloudModel cloudModel) {
debug("%s iot device %s/%s, %s %s", cloudModel.operation, registryId, deviceId,
cloudModel.blocked, cloudModel.num_id);
return getProviderFor(registryId).modelDevice(registryId, deviceId, cloudModel);
return getProviderFor(registryId, deviceId).modelDevice(registryId, deviceId, cloudModel);
}

@Override
public CloudModel modelRegistry(String registryId, String deviceId, CloudModel cloudModel) {
return getProviderFor(registryId).modelRegistry(registryId, deviceId, cloudModel);
return getProviderFor(registryId, deviceId).modelRegistry(registryId, deviceId, cloudModel);
}

@Override
Expand All @@ -174,15 +178,20 @@ public void setProviderAffinity(String registryId, String deviceId, String provi
if (providerId != null) {
int index = providerId.indexOf(Common.SOURCE_SEPARATOR);
String affinity = providerId.substring(0, index < 0 ? providerId.length() : index);
String previous = registryProviders.put(registryId, affinity);
if (!providerId.equals(previous)) {
debug(format("Switching registry affinity for %s from %s -> %s", registryId, previous,
String providerKey = getProviderKey(registryId, deviceId);
String previous = registryProviders.put(providerKey, affinity);
if (!affinity.equals(previous)) {
debug(format("Switched registry affinity for %s from %s -> %s", providerKey, previous,
affinity));
}
}
super.setProviderAffinity(registryId, deviceId, providerId);
}

private static String getProviderKey(String registryId, String deviceId) {
return format(PROVIDER_KEY_FORMAT, registryId, deviceId);
}

@Override
public String updateConfig(Envelope envelope, String config, Long version) {
throw new RuntimeException("Shouldn't be called for dynamic provider");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ private void processException(Envelope reflection, Map<String, Object> objectMap
}

private void processReflection(Envelope reflection, Envelope env, Object payload) {
debug("Processing reflection %s/%s %s %s", env.subType, env.subFolder,
isoConvert(env.publishTime), env.transactionId);
debug("Processing reflection %s/%s %s %s for %s", env.subType, env.subFolder,
isoConvert(env.publishTime), env.transactionId, env.deviceId);
CloudModel result = getReflectionResult(env, payload);
ifNotNullThen(result, r -> {
debug("Return reflection result %s %s %s", r.operation, env.subType, env.transactionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,10 @@ private synchronized void setReflectorState() {
map.put(SubFolder.UDMI.value(), udmiState);

if (isInstallValid) {
debug("Sending UDMI reflector state: " + stringifyTerse(udmiState.setup));
debug(format("Sending UDMI reflector state to %s: %s", registryId,
stringifyTerse(udmiState.setup)));
} else {
info("Sending UDMI reflector state: " + stringify(map));
info(format("Sending UDMI reflector state to %s: %s", registryId, stringify(map)));
}

publishStats.update();
Expand Down Expand Up @@ -546,6 +547,7 @@ public void activate() {
try {
// Some publishers are shared, while others are unique, so handle accordingly.
if (pubCounts.get(publisher).getAndIncrement() > 0) {
setReflectorState();
ifTrueThen(pubLatches.get(publisher).getCount() > 0,
() -> System.err.println("Waiting for the other shoe to drop..."));
pubLatches.get(publisher).await(CONFIG_TIMEOUT_SEC, TimeUnit.SECONDS);
Expand Down
Loading

0 comments on commit 2bf4556

Please sign in to comment.