Skip to content
This repository has been archived by the owner on Mar 31, 2023. It is now read-only.

[Tracing] Support Jaeger tracing in NCM and TC #707

Merged
merged 136 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
136 commits
Select commit Hold shift + click to select a range
47caabf
Tried to make TC's create container process concurrent
zzxgzgz Aug 3, 2021
6a29451
Added back background ping
zzxgzgz Aug 3, 2021
6adc8e0
Changed fixed time sleep to a library(await) that waits until GSs are…
zzxgzgz Aug 4, 2021
fbe57f1
Try to add code to send GSV1 to ACA for testing
zzxgzgz Aug 13, 2021
62992b3
Try to add code to send GSV1 to ACA for testing
zzxgzgz Aug 13, 2021
f4df134
Sending only gsv1, to see if it works
zzxgzgz Aug 13, 2021
3402d92
Sending only gsv1, to see if it works
zzxgzgz Aug 13, 2021
1378f7a
Sending only gsv1, to see if it works
zzxgzgz Aug 13, 2021
25507e5
Enable gsv2 sending for testing.
zzxgzgz Aug 17, 2021
8918995
Enable gsv2 sending for testing.
zzxgzgz Aug 17, 2021
5fdc054
Enable gsv2 sending for testing.
zzxgzgz Aug 17, 2021
1d4d1c0
Enable gsv2 sending for testing.
zzxgzgz Aug 17, 2021
88d18bc
Test sending v1 and v2 together
zzxgzgz Aug 17, 2021
210ccda
Comment out sending GSV1 code in TC for on-demand performance testing
zzxgzgz Aug 18, 2021
c146da5
Start to change TC, and try to figure out why two hosts have differen…
zzxgzgz Sep 14, 2021
c2aaab5
Try to make port amount balanced
zzxgzgz Sep 14, 2021
32be97d
Try to add support for different number of ports on two hosts
zzxgzgz Sep 14, 2021
d2c5b50
Try to add support for different number of ports on two hosts
zzxgzgz Sep 14, 2021
556217a
Try to add support for different number of ports on two hosts
zzxgzgz Sep 14, 2021
e51bc76
Try to add support for different number of ports on two hosts
zzxgzgz Sep 14, 2021
50a13b4
Try to add support for different number of ports on two hosts
zzxgzgz Sep 14, 2021
4afbbe3
Disabling container creating and pings, to focus on testing ACA's abi…
zzxgzgz Sep 14, 2021
cf4f34f
Disabling gsv2 printout, also added time for waiting response of gRPC…
zzxgzgz Sep 20, 2021
cb97d94
Tried to add router states in TC
zzxgzgz Sep 22, 2021
f959c07
Fixed set/add function mismatches
zzxgzgz Sep 22, 2021
a17ae63
Added setRevisionNumber for router configuration
zzxgzgz Sep 22, 2021
05c17ad
Added gateway info in the subnet state
zzxgzgz Sep 22, 2021
5358b73
Make host_vdr_mac_address consistent for routers on the same host
zzxgzgz Sep 22, 2021
53f2d9f
Try to make neighbors L3, and added routers to both hosts' GSs
zzxgzgz Sep 23, 2021
3b82784
Added another subnet, so there are now two subnets
zzxgzgz Sep 23, 2021
827b0ab
Added another subnet, so there are now two subnets
zzxgzgz Sep 23, 2021
a69951d
Added another subnet, so there are now two subnets
zzxgzgz Sep 23, 2021
f604cf8
Change neighbor type to l2 for testing
zzxgzgz Sep 23, 2021
ff9626f
Enable container creating and ping for testing
zzxgzgz Oct 13, 2021
b39014c
Try to remove extra subnet/router, and focus on testing pings
zzxgzgz Oct 13, 2021
823f79f
Added more wait time for container creating
zzxgzgz Oct 13, 2021
d4b9626
Merge branch 'master' into tc_create_containers_concurrently
zzxgzgz Oct 18, 2021
3a3f124
Try to use countdown latch, instead of simply sleep
zzxgzgz Oct 18, 2021
a3da687
Changed wait() to await(); fixed typo in comment; deleted unused code
zzxgzgz Oct 18, 2021
8960b45
Removed extra files
zzxgzgz Oct 18, 2021
8b0832c
Added param to let user decide whether to create container and ping
zzxgzgz Oct 18, 2021
0ad576c
Added param to let user decide whether to create container and ping
zzxgzgz Oct 18, 2021
8bad8f4
Created a constant variable to represent the default vlan_id, 1
zzxgzgz Oct 20, 2021
39601b8
Saving code before pulling upstream master
zzxgzgz Nov 2, 2021
e7d2bdf
Merge branch 'master' into onNext_slow
zzxgzgz Nov 2, 2021
5bf1618
Added logs to furthur breakdown the time spent in doSendGoalState
zzxgzgz Nov 2, 2021
639c66a
Try to reuse StreamObserver s for gRPC request and reply, to see if i…
zzxgzgz Nov 2, 2021
1a2fa08
Added mutex.unlock() when error happens calling onNext
zzxgzgz Nov 2, 2021
c93bb12
Commented out mutexes for debugging
zzxgzgz Nov 2, 2021
aff8ac9
Reverted back stream oberver design, added more logs to breakdown the…
zzxgzgz Nov 2, 2021
415c12a
Chnaged implementation for sendGoalStates so that it doesn't wait for…
zzxgzgz Nov 3, 2021
0c55954
Tried to make warmup calls continously for 5 seconds
zzxgzgz Nov 3, 2021
0e2874d
Revert channel warmup method
zzxgzgz Nov 3, 2021
8cfb15d
Revert channel warmup method
zzxgzgz Nov 3, 2021
7d0727a
Investigate why only 10 warmup gs were sent, when 1000 should be sent
zzxgzgz Nov 3, 2021
777a6a3
Tried to use fixedThreadPool
zzxgzgz Nov 3, 2021
92dfb19
Try to make flow control window bigger
zzxgzgz Nov 4, 2021
f9208dd
Investigate why multiple warmups can't be received by ACA
zzxgzgz Nov 4, 2021
d7a7a50
Try to change default value of numberOfWarmupsPerChannel
zzxgzgz Nov 4, 2021
0e0b13c
Set different version number of warmup gs, to see if it makes a diffe…
zzxgzgz Nov 4, 2021
e452f6d
Tried to sleep after onNext for warmups
zzxgzgz Nov 4, 2021
3e5a21f
Sleep doesn't work, try to use old channel builder
zzxgzgz Nov 4, 2021
85e6810
Try to init one request and response observer for each onNext
zzxgzgz Nov 4, 2021
bea19eb
Tried to warmup each channel for five seconds
zzxgzgz Nov 4, 2021
3eb10ea
Try to warmup channels at the same time
zzxgzgz Nov 4, 2021
52af123
Added back future to future list
zzxgzgz Nov 4, 2021
83f27d3
Change back to use NettyChannelBuilder, to see if it brings better perf
zzxgzgz Nov 4, 2021
72f976e
Try to trace grpc client for pusing goalstates
zzxgzgz Nov 9, 2021
609f9bd
Investigate on why it complains service name is null/empty
zzxgzgz Nov 9, 2021
318c663
Investigate on why it complains service name is null/empty
zzxgzgz Nov 9, 2021
a7398fb
Try to get tracer from env
zzxgzgz Nov 9, 2021
e9b9306
Try to add spring.application.name
zzxgzgz Nov 9, 2021
70413cf
Use tracr resolver again
zzxgzgz Nov 9, 2021
46dbb97
Use global tracer
zzxgzgz Nov 10, 2021
19dce9d
Tried to add spans to sendGoalStates
zzxgzgz Nov 10, 2021
52680d9
Tried to add jaeger tracer/span to tc and the grpc server in NCM
zzxgzgz Nov 10, 2021
93fd820
Temperarily execute the ping using ssh
zzxgzgz Nov 10, 2021
10c2787
Try to use different way to get parent span
zzxgzgz Nov 10, 2021
0814050
Added logs to see what kind of tracer and span we got here
zzxgzgz Nov 10, 2021
4ad7610
Try to see what tracer other services have
zzxgzgz Nov 11, 2021
a3738f6
Try to get tracer from TracerResolver, and see what is in there
zzxgzgz Nov 11, 2021
4af5450
Removed duplicated dependency
zzxgzgz Nov 11, 2021
75508a0
Disabled grpc and enable rest in NCM
zzxgzgz Nov 11, 2021
2738f5b
Continue investigating the jaeger on NCM
zzxgzgz Nov 12, 2021
e7fbee5
Removed default values
zzxgzgz Nov 12, 2021
085f52a
Added spaces around = sign
zzxgzgz Nov 12, 2021
7ae558d
Added more logs
zzxgzgz Nov 12, 2021
9fa6c13
Check if all stuffs in applications.properties doesn't get reflected
zzxgzgz Nov 12, 2021
fa3c2d2
Is none of the @Value working
zzxgzgz Nov 12, 2021
2f1a09d
Changed position of @Component
zzxgzgz Nov 12, 2021
4428f6f
Try to get param from ENV
zzxgzgz Nov 12, 2021
d03391f
Try to log @Value after construction
zzxgzgz Nov 12, 2021
d9487b6
Try to log @Value after construction
zzxgzgz Nov 12, 2021
928be6d
Try to add a tracer after construction
zzxgzgz Nov 12, 2021
86e1c5a
Used configuration to mannually create and register a tracer for NCM
zzxgzgz Nov 12, 2021
81eca68
Added more spans
zzxgzgz Nov 12, 2021
49a7edd
Added span for on-demand reply
zzxgzgz Nov 12, 2021
2a56581
Changed tracer in pseudo controller, also added spans in NCM pushdown…
zzxgzgz Nov 12, 2021
8ae52f7
Changed tracer in pseudo controller, also added spans in NCM pushdown…
zzxgzgz Nov 12, 2021
49a1c74
Changed tracer in pseudo controller, also added spans in NCM pushdown…
zzxgzgz Nov 12, 2021
f33b2c1
Added some DurationStatistics
zzxgzgz Nov 13, 2021
e78c87b
Added printouts for tracer configs
zzxgzgz Nov 13, 2021
83d1f5f
Added to string
zzxgzgz Nov 13, 2021
bd81ece
testing with verbosity for interceptors
zzxgzgz Nov 16, 2021
af4ed99
testing with verbosity for interceptors
zzxgzgz Nov 16, 2021
73cd267
testing with streaming for interceptors
zzxgzgz Nov 16, 2021
7237c8c
Try to add scopes for spans
zzxgzgz Nov 16, 2021
ac35258
manually added span log for experiement
zzxgzgz Nov 16, 2021
bedee17
Added env var JAEGER_SERVICE_NAME in order to let TracerResolver.reso…
zzxgzgz Nov 17, 2021
6d8d34e
Adjusted locations of spans
zzxgzgz Nov 29, 2021
56312a1
Try to make gRPC client use the same tracer as gRPC server
zzxgzgz Nov 29, 2021
3a6dc7c
adjust scope position for ncm's gRPC client
zzxgzgz Nov 30, 2021
1f40ff4
Changed how requestGoalStates get the parent span
zzxgzgz Nov 30, 2021
760a93e
Reset parent span to null
zzxgzgz Nov 30, 2021
0921f44
Reset parent span to null
zzxgzgz Nov 30, 2021
d0ae523
Reset parent span to null
zzxgzgz Nov 30, 2021
157b73b
Reset parent span to null
zzxgzgz Nov 30, 2021
5709a6b
Corrected wrong parent spans
zzxgzgz Nov 30, 2021
483fc8c
get back parent span, since it is not the cause of the chained spans
zzxgzgz Nov 30, 2021
07090d7
set parent span to null for requestGoalStates
zzxgzgz Nov 30, 2021
cc212dc
get parent spans for requestGoalStates
zzxgzgz Nov 30, 2021
e4bee84
Investigate why all requestGoalStates related spans are connected tog…
zzxgzgz Nov 30, 2021
0a8ad33
Added parent span context printing for request goal states
zzxgzgz Nov 30, 2021
97c12ee
Changed to print parent span's tracer id
zzxgzgz Dec 1, 2021
4785a6b
Added span id printing
zzxgzgz Dec 1, 2021
838bca7
Try to move server and tracer initialization back to the constructor
zzxgzgz Dec 1, 2021
737de02
Removed un needed code
zzxgzgz Dec 1, 2021
96979b8
Try to merge master
zzxgzgz Dec 1, 2021
a7f7c38
Added back missing onNext(reply)
zzxgzgz Dec 1, 2021
7964bae
Removed unwanted files and code
zzxgzgz Dec 1, 2021
4c99f65
Try to make test controller's tracer report to the jaeger on another …
zzxgzgz Dec 1, 2021
00cee48
Added null check and removed unused code
zzxgzgz Dec 1, 2021
bd30110
Added VPC states into cache during pushGoalStatesStream
zzxgzgz Dec 4, 2021
7efc558
Tried to merge from master
zzxgzgz Dec 4, 2021
c82daab
Removed local file
zzxgzgz Dec 10, 2021
562e44e
Made span names a part of the application.properties
zzxgzgz Dec 10, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions services/network_config_manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,27 @@ Copyright(c) 2020 Futurewei Cloud
</repositories>

<dependencies>
zzxgzgz marked this conversation as resolved.
Show resolved Hide resolved
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-tracerresolver</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-spring-cloud-starter</artifactId>
<version>0.3.12</version>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-grpc</artifactId>
<version>0.2.3</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.1.36.Final</version>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-spring-jaeger-cloud-starter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,30 @@ free of charge, to any person obtaining a copy of this software and associated d
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoopGroup;
import io.grpc.netty.shaded.io.netty.channel.epoll.EpollSocketChannel;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.jaegertracing.Configuration;
import io.jaegertracing.internal.JaegerTracer;
import io.jaegertracing.internal.samplers.ConstSampler;
import io.lettuce.core.dynamic.annotation.Param;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.contrib.tracerresolver.TracerResolver;
import io.opentracing.util.GlobalTracer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.stream.Collectors;
import io.opentracing.Tracer;
import io.opentracing.contrib.grpc.TracingClientInterceptor;

@Service("grpcGoalStateClient")
public class GoalStateClientImpl implements GoalStateClient {
Expand All @@ -62,14 +77,16 @@ public class GoalStateClientImpl implements GoalStateClient {

private ConcurrentHashMap<String, ArrayList<GrpcChannelStub>> hostIpGrpcChannelStubMap;

private final Tracer tracer;

@DurationStatistics
public static GoalStateClientImpl getInstance(int numberOfGrpcChannelPerHost, int numberOfWarmupsPerChannel, ArrayList<String> monitorHosts) {
if (instance == null) {
instance = new GoalStateClientImpl(numberOfGrpcChannelPerHost, numberOfWarmupsPerChannel, monitorHosts);
}
return instance;
}


public GoalStateClientImpl(@Value("${grpc.number-of-channels-per-host:1}") int numberOfGrpcChannelPerHost, @Value("${grpc.number-of-warmups-per-channel:1}") int numberOfWarmupsPerChannel, @Value("")ArrayList<String> monitorHosts) {

if ((this.numberOfGrpcChannelPerHost = numberOfGrpcChannelPerHost) < 1) {
Expand All @@ -82,8 +99,8 @@ public GoalStateClientImpl(@Value("${grpc.number-of-channels-per-host:1}") int n

this.monitorHosts = monitorHosts;
logger.log(Level.FINE, "Printing out all monitorHosts");
for(String host : this.monitorHosts){
logger.log(Level.FINE, "Monitoring this host: "+ host);
for (String host : this.monitorHosts) {
logger.log(Level.FINE, "Monitoring this host: " + host);
}
logger.log(Level.FINE, "Done printing out all monitorHosts");
this.hostAgentPort = 50001;
Expand All @@ -96,20 +113,33 @@ public GoalStateClientImpl(@Value("${grpc.number-of-channels-per-host:1}") int n
new DefaultThreadFactory("grpc-thread-pool"));
//TODO: Setup a connection pool. one ACA, one client.
this.hostIpGrpcChannelStubMap = new ConcurrentHashMap();
logger.log(Level.FINE, "This instance has "+ numberOfGrpcChannelPerHost+" channels, and "+ numberOfWarmupsPerChannel+" warmups");
Configuration.SamplerConfiguration samplerConfiguration = Configuration.SamplerConfiguration.fromEnv()
.withType(ConstSampler.TYPE)
.withParam(1);
Configuration.ReporterConfiguration reporterConfiguration = Configuration.ReporterConfiguration.fromEnv()
.withLogSpans(true);


this.tracer = GlobalTracer.get();//Configuration.fromEnv().getTracer();
//TracerResolver.resolveTracer();
logger.log(Level.INFO, "[GoalStateClientImpl] Got this global tracer: "+this.tracer.toString());
logger.log(Level.FINE, "This instance has " + numberOfGrpcChannelPerHost + " channels, and " + numberOfWarmupsPerChannel + " warmups");

}

@Override
@DurationStatistics
public List<String> sendGoalStates(Map<String, HostGoalState> hostGoalStates) throws Exception {

final CountDownLatch finishLatch = new CountDownLatch(hostGoalStates.values().size());
logger.log(Level.INFO, "Host goal states size: " + hostGoalStates.values().size());
List<String> replies = new ArrayList<>();

for (HostGoalState hostGoalState : hostGoalStates.values()) {
doSendGoalState(hostGoalState, finishLatch, replies);
}


if (!finishLatch.await(1, TimeUnit.MINUTES)) {
logger.log(Level.WARNING, "Send goal states can not finish within 1 minutes");
return Arrays.asList("Send goal states can not finish within 1 minutes");
Expand All @@ -120,6 +150,7 @@ public List<String> sendGoalStates(Map<String, HostGoalState> hostGoalStates) th
return new ArrayList<>();
}

@DurationStatistics
private GrpcChannelStub getOrCreateGrpcChannel(String hostIp) {
if (!this.hostIpGrpcChannelStubMap.containsKey(hostIp)) {
this.hostIpGrpcChannelStubMap.put(hostIp, createGrpcChannelStubArrayList(hostIp));
Expand All @@ -139,20 +170,36 @@ private GrpcChannelStub getOrCreateGrpcChannel(String hostIp) {
return this.hostIpGrpcChannelStubMap.get(hostIp).get(usingChannelWithThisIndex);
}

@DurationStatistics
private ArrayList<GrpcChannelStub> createGrpcChannelStubArrayList(String hostIp) {
long start = System.currentTimeMillis();
ArrayList<GrpcChannelStub> arr = new ArrayList<>();
List<Future<Integer>> channels_warmup_future = new ArrayList<>();
for (int i = 0; i < numberOfGrpcChannelPerHost; i++) {
GrpcChannelStub channelStub = createGrpcChannelStub(hostIp);
warmUpChannelStub(channelStub, hostIp);
arr.add(channelStub);
// wait until all warmups for all channels are finished.
Future<Integer> channel_warmup_future = this.executor.submit(()->{
warmUpChannelStub(channelStub, hostIp);
arr.add(channelStub);
return 1;
});
channels_warmup_future.add(channel_warmup_future);
}
channels_warmup_future.parallelStream().filter(Objects::nonNull).map(future -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
}).collect(Collectors.toList());
long end = System.currentTimeMillis();
logger.log(Level.FINE, "[createGrpcChannelStubArrayList] Created " + numberOfGrpcChannelPerHost + " gRPC channel stubs for host " + hostIp + ", elapsed Time in milli seconds: " + (end - start));
return arr;
}

// try to warmup a gRPC channel and its stub, by sending an empty GoalState`.
@DurationStatistics
void warmUpChannelStub(GrpcChannelStub channelStub, String hostIp) {
GoalStateProvisionerGrpc.GoalStateProvisionerStub asyncStub = channelStub.stub;

Expand Down Expand Up @@ -193,22 +240,32 @@ public void onCompleted() {
return;
}

@DurationStatistics
private GrpcChannelStub createGrpcChannelStub(String hostIp) {
// adding tracing stuffs for each channel
TracingClientInterceptor tracingClientInterceptor = TracingClientInterceptor
.newBuilder()
.withTracer(this.tracer)
.withVerbosity()
.withStreaming()
.build();


ManagedChannel channel = ManagedChannelBuilder.forAddress(hostIp, this.hostAgentPort)
.usePlaintext()
.keepAliveWithoutCalls(true)
.keepAliveTime(Long.MAX_VALUE, TimeUnit.SECONDS)
.build();
GoalStateProvisionerGrpc.GoalStateProvisionerStub asyncStub = GoalStateProvisionerGrpc.newStub(channel);

GoalStateProvisionerGrpc.GoalStateProvisionerStub asyncStub = GoalStateProvisionerGrpc.newStub(tracingClientInterceptor.intercept(channel));
return new GrpcChannelStub(channel, asyncStub);

}

private void doSendGoalState(HostGoalState hostGoalState, CountDownLatch finishLatch, List<String> replies) throws InterruptedException {
String hostIp = hostGoalState.getHostIp();
logger.log(Level.FINE, "Setting up a channel to ACA on: " + hostIp);
long start = System.currentTimeMillis();

long end = 0;
GrpcChannelStub channelStub = getOrCreateGrpcChannel(hostIp);
long chan_established = System.currentTimeMillis();
logger.log(Level.FINE, "[doSendGoalState] Established channel, elapsed Time in milli seconds: " + (chan_established - start));
Expand All @@ -218,6 +275,7 @@ private void doSendGoalState(HostGoalState hostGoalState, CountDownLatch finishL
logger.log(Level.FINE, "[doSendGoalState] Established stub, elapsed Time after channel established in milli seconds: " + (stub_established - chan_established));

Map<String, List<Goalstateprovisioner.GoalStateOperationReply.GoalStateOperationStatus>> result = new HashMap<>();

StreamObserver<Goalstateprovisioner.GoalStateOperationReply> responseObserver = new StreamObserver<>() {
@Override
public void onNext(Goalstateprovisioner.GoalStateOperationReply reply) {
Expand All @@ -244,10 +302,18 @@ public void onCompleted() {
};

StreamObserver<Goalstate.GoalStateV2> requestObserver = asyncStub.pushGoalStatesStream(responseObserver);
long requestObserverEstablished = System.currentTimeMillis();
logger.log(Level.FINE, "[doSendGoalState] Established RequestObserver, elapsed Time after stub established in milli seconds: " + (requestObserverEstablished - stub_established));
zzxgzgz marked this conversation as resolved.
Show resolved Hide resolved
try {
long before_get_goalState = System.currentTimeMillis();
Goalstate.GoalStateV2 goalState = hostGoalState.getGoalState();
logger.log(Level.INFO, "Sending GS to Host " + hostIp + " as follows | " + goalState.toString());
long after_get_goalState = System.currentTimeMillis();
logger.log(Level.INFO, "Sending GS with size " + goalState.getSerializedSize() + " to Host " + hostIp + " as follows | " + goalState.toString());
requestObserver.onNext(goalState);
long after_onNext = System.currentTimeMillis();
logger.log(Level.FINE, "[doSendGoalState] Get goalstatev2 from HostGoalState in milliseconds: " + (after_get_goalState - before_get_goalState));
logger.log(Level.FINE, "[doSendGoalState] Call onNext in milliseconds: " + (after_onNext - after_get_goalState));

if (hostGoalState.getGoalState().getNeighborStatesCount() == 1 && monitorHosts.contains(hostIp)) {
long sent_gs_time = System.currentTimeMillis();
// If there's only one neighbor state and it is trying to send it to aca_node_one, the IP of which is now
Expand All @@ -265,12 +331,16 @@ public void onCompleted() {
// Mark the end of requests
logger.log(Level.INFO, "Sending GS to Host " + hostIp + " is completed");

// comment out onCompleted so that the same channel/stub and keep sending next time.
end = System.currentTimeMillis();
long onNext_called = System.currentTimeMillis();
logger.log(Level.FINE, "[doSendGoalState] Whole function call took time in milliseconds: "+(end - start) +
" \nFrom established stub to onNext called, elapsed Time after channel established in milli seconds: " + (onNext_called - requestObserverEstablished));
requestObserver.onCompleted();

// shutdown(channel);
}

@DurationStatistics
private void shutdown(ManagedChannel channel) {
try {
channel.shutdown().awaitTermination(Config.SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
Expand Down
Loading