diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml
index d8240bc97a..02d9ac56c2 100644
--- a/.github/workflows/testing.yml
+++ b/.github/workflows/testing.yml
@@ -49,12 +49,13 @@ jobs:
- name: Setup udmis container build
if: ${{ github.event_name == 'push' }}
run: |
+ set -x
revhash=$(git rev-parse $GITHUB_REF)
IMAGE_TAG=g${revhash:0:9}
PUSH_REPO=$PUSH_REGISTRY/${{ github.repository }}
PUSH_TAG=$PUSH_REPO:$IMAGE_TAG
echo PUSH_TAG=$PUSH_TAG >> $GITHUB_ENV
- udmis/bin/container prep --no-check $PUSH_REPO
+ udmis/bin/container prep --no-check ${PUSH_REPO%/*}
echo Pushing built container as $PUSH_TAG | tee -a $GITHUB_STEP_SUMMARY
- name: Build and push docker image
if: ${{ github.event_name == 'push' }}
@@ -295,7 +296,6 @@ jobs:
cd sites/udmi_site_model/out
find . -type f | sort | xargs more | cat
- name: itemized sequencer tests
- if: ${{ always() }}
run: |
bin/test_itemized $GCP_TARGET_PROJECT
- name: itemized output logs
diff --git a/common/src/main/java/com/google/udmi/util/GeneralUtils.java b/common/src/main/java/com/google/udmi/util/GeneralUtils.java
index 26d2fb2246..14524838e5 100644
--- a/common/src/main/java/com/google/udmi/util/GeneralUtils.java
+++ b/common/src/main/java/com/google/udmi/util/GeneralUtils.java
@@ -49,8 +49,9 @@ public class GeneralUtils {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.enable(SerializationFeature.INDENT_OUTPUT)
.setDateFormat(new ISO8601DateFormat())
+ .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
- private static final ObjectMapper OBJECT_MAPPER_RAW =
+ public static final ObjectMapper OBJECT_MAPPER_RAW =
OBJECT_MAPPER.copy()
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
.enable(Feature.ALLOW_TRAILING_COMMA)
@@ -59,7 +60,7 @@ public class GeneralUtils {
.setSerializationInclusion(Include.NON_NULL);
public static final ObjectMapper OBJECT_MAPPER_STRICT =
OBJECT_MAPPER_RAW.copy()
- .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
+ .enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.disable(SerializationFeature.INDENT_OUTPUT);
private static final String SEPARATOR = "\n ";
diff --git a/udmis/.idea/runConfigurations/UdmiServicePod_configured.xml b/udmis/.idea/runConfigurations/UdmiServicePod_configured.xml
index 59670b69c6..d05214369a 100644
--- a/udmis/.idea/runConfigurations/UdmiServicePod_configured.xml
+++ b/udmis/.idea/runConfigurations/UdmiServicePod_configured.xml
@@ -19,4 +19,4 @@
-
+
\ No newline at end of file
diff --git a/udmis/bin/container b/udmis/bin/container
index c85b0bfc22..ee98c296da 100755
--- a/udmis/bin/container
+++ b/udmis/bin/container
@@ -6,7 +6,7 @@ DROOT=.
function usage {
echo Error: $*
- echo Usage: $0 { prep, build, shell, run, push, deploy, update, status, logs, stop } [--no-check] [--selfie] [repo]
+ echo Usage: $0 { prep, build, shell, run, deploy, update, status, logs, stop } [--no-check] [--selfie] [repo]
echo Project: $GCP_PROJECT
echo Try starting with: $0 build
exit 1
@@ -18,8 +18,6 @@ shift || usage missing command
GCP_PROJECT=$(gcloud config get project)
REPOSITORY=gcr.io/$GCP_PROJECT
-mkdir -p tmp
-
IMAGE=udmis
if [[ $1 == "--no-check" ]]; then
@@ -60,7 +58,8 @@ current_user=$USER@$HOSTNAME
revparse=`git rev-parse HEAD`
-udmi_ref=$REPOSITORY:g${revparse:0:9}
+udmi_ver=g${revparse:0:9}
+udmi_ref=$REPOSITORY/udmi:$udmi_ver
version=`git describe || echo $udmi_ref`
@@ -68,7 +67,7 @@ RUNARGS="--rm -ti -v $PWD/var:/udmi -v $HOME/.config:/root/.config --tmpfs /tmp"
TEMPLATES=$(cd etc; ls k8s_*.yaml)
if [[ $cmd == prep || $cmd == build ]]; then
- rm -rf var tmp && mkdir -p var
+ rm -rf var tmp && mkdir -p var tmp
bin/build
build_time=`date --utc -Imin -r $LIBFILE`
cp etc/prod_pod.json var/
@@ -84,29 +83,32 @@ EOF
echo Next try: $0 build
fi
-if [[ $cmd == prep ]]; then
- true
-elif [[ $cmd == build ]]; then
+if [[ $cmd == build ]]; then
echo Building Dockerfile.$IMAGE
docker build -f Dockerfile.$IMAGE -t $IMAGE $DROOT
- echo Next try: $0 push
-elif [[ $cmd == run ]]; then
- docker run $RUNARGS $IMAGE
-elif [[ $cmd == shell ]]; then
- docker run $RUNARGS $IMAGE bash
-elif [[ $cmd == push ]]; then
- IMAGENAME=$REPOSITORY/$IMAGE
- docker tag $IMAGE $IMAGENAME
- docker push $IMAGENAME
- hash=$(docker images --digests $IMAGENAME | fgrep latest | awk '{print $3}')
- ihash=$IMAGENAME@$hash
+fi
+
+if [[ $cmd == build && $REPOSITORY != local ]]; then
+ docker tag $IMAGE $udmi_ref
+ docker push $udmi_ref
+ hash=$(docker images --digests ${udmi_ref%:*} | fgrep $udmi_ver | awk '{print $3}')
+ ihash=$udmi_ref@$hash
for file in $TEMPLATES; do
if fgrep -q @IMAGE-$IMAGE@ etc/$file; then
sed < etc/$file > tmp/$file -e "s^@IMAGE-$IMAGE@^$ihash^"
echo Updated tmp/$file with image $ihash
fi
done
- echo Next try: $0 deploy
+fi
+
+if [[ $cmd == prep ]]; then
+ echo Next try: $0 build
+elif [[ $cmd == build ]]; then
+ echo Next try: $0 '{run, deploy, update}'
+elif [[ $cmd == run ]]; then
+ docker run $RUNARGS $IMAGE
+elif [[ $cmd == shell ]]; then
+ docker run $RUNARGS $IMAGE bash
elif [[ $cmd == deploy ]]; then
kubectl apply -f tmp/k8s_pod.yaml
echo Next try: $0 status
diff --git a/udmis/bin/deploy b/udmis/bin/deploy
deleted file mode 100755
index 1735976d30..0000000000
--- a/udmis/bin/deploy
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/bin/bash -e
-
-if [[ $# != 1 ]]; then
- echo Usage: $0 project_id
- false
-fi
-
-project_id=$1
-shift
-
-ROOT=$(dirname $0)/..
-cd $ROOT
-
-bin/setup $project_id
-
-bin/container build
-bin/container push
-bin/container deploy
-sleep 10
-kubectl get pods
diff --git a/udmis/bin/deploy b/udmis/bin/deploy
new file mode 120000
index 0000000000..77c430a5ac
--- /dev/null
+++ b/udmis/bin/deploy
@@ -0,0 +1 @@
+update
\ No newline at end of file
diff --git a/udmis/bin/update b/udmis/bin/update
index d0b26ec379..fc873822e7 100755
--- a/udmis/bin/update
+++ b/udmis/bin/update
@@ -8,13 +8,13 @@ fi
project_id=$1
shift
+CMD=$(basename $0)
ROOT=$(dirname $0)/..
cd $ROOT
bin/setup $project_id
bin/container build
-bin/container push
-bin/container update
+bin/container $CMD
sleep 10
kubectl get pods
diff --git a/udmis/etc/prod_pod.json b/udmis/etc/prod_pod.json
index 3b3cfc5b93..487171373f 100644
--- a/udmis/etc/prod_pod.json
+++ b/udmis/etc/prod_pod.json
@@ -24,12 +24,13 @@
},
"clearblade-iot-core": {
"provider": "clearblade",
- "project_id": "${CLEARBLADE_PROJECT}"
+ "project_id": "${CLEARBLADE_PROJECT}",
+ "options": "distributor=stately"
},
"gcp-iot-core": {
"provider": "gcp",
"project_id": "${GCP_PROJECT}",
- "options": "${GCP_IOT_OPTIONS}"
+ "options": "${GCP_IOT_OPTIONS},distributor=stately"
}
},
"distributors": {
diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/ClearBladeIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/ClearBladeIotAccessProvider.java
index 1ace7685f0..5dcbd05eaf 100644
--- a/udmis/src/main/java/com/google/bos/udmi/service/access/ClearBladeIotAccessProvider.java
+++ b/udmis/src/main/java/com/google/bos/udmi/service/access/ClearBladeIotAccessProvider.java
@@ -7,10 +7,10 @@
import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
import static com.google.udmi.util.GeneralUtils.ifNotNullGet;
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
+import static com.google.udmi.util.GeneralUtils.ifNotTrueThen;
import static com.google.udmi.util.GeneralUtils.ifTrueThen;
import static com.google.udmi.util.GeneralUtils.isTrue;
import static com.google.udmi.util.JsonUtil.getDate;
-import static com.google.udmi.util.JsonUtil.getTimestamp;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.Optional.ofNullable;
@@ -53,10 +53,7 @@
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
import com.google.udmi.util.GeneralUtils;
-import java.time.Duration;
-import java.time.Instant;
import java.util.AbstractMap.SimpleEntry;
import java.util.Base64;
import java.util.Date;
@@ -66,7 +63,6 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -103,8 +99,11 @@ public class ClearBladeIotAccessProvider extends IotAccessBase {
* Create a new instance for interfacing with GCP IoT Core.
*/
public ClearBladeIotAccessProvider(IotAccess iotAccess) {
+ super(iotAccess);
projectId = getProjectId(iotAccess);
ifTrueThen(isEnabled(), this::fetchRegistryRegions);
+ ifNotTrueThen(isEnabled(),
+ () -> warn("Clearblade access provided disabled because project id is null or empty"));
}
private static Credential convertIot(DeviceCredential device) {
@@ -147,6 +146,27 @@ protected DeviceManagerClient getDeviceManagerClient() {
return new DeviceManagerClient();
}
+ @NotNull
+ protected Set getRegistriesForRegion(String region) {
+ try {
+ DeviceManagerClient deviceManagerClient = getDeviceManagerClient();
+ ListDeviceRegistriesRequest request = ListDeviceRegistriesRequest.Builder.newBuilder()
+ .setParent(LocationName.of(projectId, region).getLocationFullName())
+ .build();
+ ListDeviceRegistriesResponse response = deviceManagerClient.listDeviceRegistries(request);
+ requireNonNull(response, "get registries response is null");
+ List deviceRegistries = response.getDeviceRegistriesList();
+ Set registries =
+ ofNullable(deviceRegistries).orElseGet(ImmutableList::of).stream()
+ .map(registry -> registry.toBuilder().getId())
+ .collect(Collectors.toSet());
+ debug("Fetched " + registries.size() + " registries for region " + region);
+ return registries;
+ } catch (Exception e) {
+ throw new RuntimeException("While fetching registries for region " + region, e);
+ }
+ }
+
@Override
protected boolean isEnabled() {
return !isNullOrEmpty(projectId);
@@ -276,6 +296,32 @@ private CloudModel deleteDevice(String registryId, Device device) {
}
}
+ @NotNull
+ private HashMap fetchDevices(String deviceRegistryId, String gatewayId) {
+ String location = getRegistryLocation(deviceRegistryId);
+ DeviceManagerClient deviceManagerClient = getDeviceManagerClient();
+ GatewayListOptions gatewayListOptions = ifNotNullGet(gatewayId, this::getGatewayListOptions);
+ String registryFullName =
+ RegistryName.of(projectId, location, deviceRegistryId).getRegistryFullName();
+ String pageToken = null;
+ HashMap collect = new HashMap<>();
+ do {
+ DevicesListRequest request = DevicesListRequest.Builder.newBuilder().setParent(
+ registryFullName)
+ .setGatewayListOptions(gatewayListOptions)
+ .setPageToken(pageToken)
+ .build();
+ DevicesListResponse response = deviceManagerClient.listDevices(request);
+ requireNonNull(response, "DeviceRegistriesList fetch failed");
+ Map responseMap =
+ response.getDevicesList().stream().map(ClearBladeIotAccessProvider::convertToEntry)
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+ collect.putAll(responseMap);
+ pageToken = response.getNextPageToken();
+ } while (pageToken != null);
+ return collect;
+ }
+
private String getDeviceName(String registryId, String deviceId) {
return DeviceName.of(projectId, getRegistryLocation(registryId), registryId, deviceId)
.toString();
@@ -298,27 +344,6 @@ private String getProjectId(IotAccess iotAccess) {
}
}
- @NotNull
- protected Set getRegistriesForRegion(String region) {
- try {
- DeviceManagerClient deviceManagerClient = getDeviceManagerClient();
- ListDeviceRegistriesRequest request = ListDeviceRegistriesRequest.Builder.newBuilder()
- .setParent(LocationName.of(projectId, region).getLocationFullName())
- .build();
- ListDeviceRegistriesResponse response = deviceManagerClient.listDeviceRegistries(request);
- requireNonNull(response, "get registries response is null");
- List deviceRegistries = response.getDeviceRegistriesList();
- Set registries =
- ofNullable(deviceRegistries).orElseGet(ImmutableList::of).stream()
- .map(registry -> registry.toBuilder().getId())
- .collect(Collectors.toSet());
- debug("Fetched " + registries.size() + " registries for region " + region);
- return registries;
- } catch (Exception e) {
- throw new RuntimeException("While fetching registries for region " + region, e);
- }
- }
-
private String getRegistryLocation(String registry) {
return getRegistryRegion(registry);
}
@@ -362,32 +387,6 @@ private CloudModel listRegistryDevices(String deviceRegistryId, String gatewayId
}
}
- @NotNull
- private HashMap fetchDevices(String deviceRegistryId, String gatewayId) {
- String location = getRegistryLocation(deviceRegistryId);
- DeviceManagerClient deviceManagerClient = getDeviceManagerClient();
- GatewayListOptions gatewayListOptions = ifNotNullGet(gatewayId, this::getGatewayListOptions);
- String registryFullName =
- RegistryName.of(projectId, location, deviceRegistryId).getRegistryFullName();
- String pageToken = null;
- HashMap collect = new HashMap<>();
- do {
- DevicesListRequest request = DevicesListRequest.Builder.newBuilder().setParent(
- registryFullName)
- .setGatewayListOptions(gatewayListOptions)
- .setPageToken(pageToken)
- .build();
- DevicesListResponse response = deviceManagerClient.listDevices(request);
- requireNonNull(response, "DeviceRegistriesList fetch failed");
- Map responseMap =
- response.getDevicesList().stream().map(ClearBladeIotAccessProvider::convertToEntry)
- .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
- collect.putAll(responseMap);
- pageToken = response.getNextPageToken();
- } while (pageToken != null);
- return collect;
- }
-
private void unbindDevice(String registryId, String gatewayId, String proxyId) {
try {
debug(format("Unbind %s: %s from %s", registryId, proxyId, gatewayId));
diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/DynamicIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/DynamicIotAccessProvider.java
index 9c0da43cd8..10530654e4 100644
--- a/udmis/src/main/java/com/google/bos/udmi/service/access/DynamicIotAccessProvider.java
+++ b/udmis/src/main/java/com/google/bos/udmi/service/access/DynamicIotAccessProvider.java
@@ -38,6 +38,7 @@ public class DynamicIotAccessProvider extends IotAccessBase {
* Create a new instance for interfacing with multiple providers.
*/
public DynamicIotAccessProvider(IotAccess iotAccess) {
+ super(iotAccess);
providerList = Arrays.asList(iotAccess.project_id.split(","));
}
@@ -135,6 +136,11 @@ public void sendCommandBase(String registryId, String deviceId, SubFolder folder
getProviderFor(registryId).sendCommandBase(registryId, deviceId, folder, message);
}
+ @Override
+ public void updateRegistryRegions(Map regions) {
+ providers.values().forEach(provider -> provider.updateRegistryRegions(regions));
+ }
+
@Override
public void setProviderAffinity(String registryId, String deviceId, String providerId) {
if (providerId != null) {
diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/GcpIotAccessProvider.java b/udmis/src/main/java/com/google/bos/udmi/service/access/GcpIotAccessProvider.java
index b8ccc65899..2f77d9aeed 100644
--- a/udmis/src/main/java/com/google/bos/udmi/service/access/GcpIotAccessProvider.java
+++ b/udmis/src/main/java/com/google/bos/udmi/service/access/GcpIotAccessProvider.java
@@ -94,8 +94,8 @@ public class GcpIotAccessProvider extends IotAccessBase {
* TODO: Need to implement page tokens for all requisite API calls.
*/
public GcpIotAccessProvider(IotAccess iotAccess) {
- String options = variableSubstitution(iotAccess.options, null);
- if (!ENABLED_OPTION.equals(options)) {
+ super(iotAccess);
+ if (!options.containsKey(ENABLED_OPTION)) {
warn("access provider disabled, missing option '%s'", ENABLED_OPTION);
projectId = null;
cloudIotService = null;
diff --git a/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java b/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java
index fe6efafcb7..478919bfec 100644
--- a/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java
+++ b/udmis/src/main/java/com/google/bos/udmi/service/access/IotAccessBase.java
@@ -11,14 +11,17 @@
import static java.util.Objects.requireNonNull;
import static java.util.Optional.ofNullable;
+import com.google.bos.udmi.service.core.DistributorPipe;
import com.google.bos.udmi.service.core.ProcessorBase.PreviousParseException;
import com.google.bos.udmi.service.pod.ContainerBase;
+import com.google.bos.udmi.service.pod.UdmiServicePod;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
+import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -28,9 +31,11 @@
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import udmi.schema.CloudModel;
+import udmi.schema.Envelope;
import udmi.schema.Envelope.SubFolder;
import udmi.schema.IotAccess;
import udmi.schema.IotAccess.IotProvider;
+import udmi.schema.UdmiState;
/**
* Generic interface for accessing iot device management.
@@ -53,8 +58,14 @@ public abstract class IotAccessBase extends ContainerBase {
IotProvider.GCP, GcpIotAccessProvider.class,
IotProvider.LOCAL, LocalIotAccessProvider.class
);
+ final Map options;
private CompletableFuture