Skip to content

Commit

Permalink
Add logs to find the issue at root
Browse files Browse the repository at this point in the history
  • Loading branch information
sakthivelmanii committed Jan 6, 2025
1 parent 21733d5 commit 4072c7c
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,19 +469,19 @@ public void run() {
// Those result sets will trigger initiateProduceRows() when the first results are received.
// Non-streaming result sets do not trigger this callback, and for those result sets, we
// need to eagerly start the ProduceRowsRunnable.
synchronized (monitor) {
if (state == State.STREAMING_IN_PROGRESS
|| state == State.RUNNING
|| state == State.CONSUMING) {
return;
}
if (state == State.STREAMING_INITIALIZED) {
state = State.STREAMING_IN_PROGRESS;
}
if (!initiateStreaming(AsyncResultSetImpl.this)) {
initiateProduceRows();
}
// synchronized (monitor) {
// if (state == State.STREAMING_IN_PROGRESS
// || state == State.RUNNING
// || state == State.CONSUMING) {
// return;
// }
// if (state == State.STREAMING_INITIALIZED) {
// state = State.STREAMING_IN_PROGRESS;
// }
if (!initiateStreaming(AsyncResultSetImpl.this)) {
initiateProduceRows();
}
// }
} catch (Throwable exception) {
executionException = SpannerExceptionFactory.asSpannerException(exception);
initiateProduceRows();
Expand All @@ -492,6 +492,7 @@ public void run() {
/** Sets the callback for this {@link AsyncResultSet}. */
@Override
public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
System.out.println("Inside setCallback");
synchronized (monitor) {
Preconditions.checkState(!closed, "This AsyncResultSet has been closed");
Preconditions.checkState(
Expand All @@ -510,7 +511,7 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {

private void initiateProduceRows() {
synchronized (monitor) {
if (this.state == State.STREAMING_IN_PROGRESS) {
if (this.state == State.STREAMING_INITIALIZED) {
this.state = State.RUNNING;
}
produceRowsInitiated = true;
Expand Down Expand Up @@ -660,7 +661,7 @@ public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsF
!partialResultSet.getResumeToken().isEmpty()
|| bufferIsFull
|| partialResultSet == GrpcStreamIterator.END_OF_STREAM;
if (startJobThread || state != State.STREAMING_IN_PROGRESS) {
if (startJobThread || state != State.STREAMING_INITIALIZED) {
initiateProduceRows();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ private ByteString generateTransactionName(String session) {
counter = new AtomicLong();
transactionCounters.put(session, counter);
}
System.out.printf("Generating session using Session ID %s\n", session);
return ByteString.copyFromUtf8(
String.format("%s/transactions/%d", session, counter.incrementAndGet()));
}
Expand Down Expand Up @@ -1999,6 +2000,8 @@ private void ensureMostRecentTransaction(Session session, ByteString transaction
if (index > -1) {
long id = Long.parseLong(transactionId.toStringUtf8().substring(index + 1));
if (id != counter.get()) {
System.out.printf(
"Session ID %s TransactionId %s\n", session.getName(), transactionId.toStringUtf8());
throw Status.FAILED_PRECONDITION
.withDescription(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
Expand All @@ -75,6 +79,24 @@ public Long apply(StructReader input) {
}
}

@Rule
public TestWatcher testWatcher = new TestWatcher() {
@Override
protected void succeeded(Description description) {
System.out.println("Succeeded test: " + description.getMethodName());
}

@Override
protected void failed(Throwable e, Description description) {
System.out.println("Failed test: " + description.getMethodName());
}

@Override
protected void starting(Description description) {
System.out.println("Starting test: " + description.getMethodName());
}
};

private static final ToLongTransformer TO_LONG = new ToLongTransformer();

@Parameter(0)
Expand Down Expand Up @@ -1434,6 +1456,8 @@ private void asyncRunner_withReadFunction(
runner.runAsync(
txn -> {
AsyncResultSet rs = readFunction.apply(txn);
System.out.println(rs);
System.out.println("Creating a new AsyncResultSet");
ApiFuture<Void> fut =
rs.setCallback(
queryExecutor,
Expand All @@ -1450,6 +1474,7 @@ private void asyncRunner_withReadFunction(
}
}
});
System.out.println("After setCallback");
return ApiFutures.transform(
fut, input -> counter.get(), MoreExecutors.directExecutor());
},
Expand Down Expand Up @@ -1573,6 +1598,8 @@ private void asyncTransactionManager_readAsync(
context.then(
(transaction, ignored) -> {
AsyncResultSet rs = fn.apply(transaction);
System.out.println(rs);
System.out.println("1. Creating a new AsyncResultSet");
ApiFuture<Void> fut =
rs.setCallback(
queryExecutor,
Expand Down

0 comments on commit 4072c7c

Please sign in to comment.