import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.function.Consumer;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
* @author Robert Varga
*/
abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
+ private static final class IncrementSequence {
+ private long delta = 1;
+
+ long getDelta() {
+ return delta;
+ }
+
+ void incrementDelta() {
+ delta++;
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
+ private final Deque<Object> successfulRequests = new ArrayDeque<>();
private final ProxyHistory parent;
private AbstractProxyTransaction successor;
return parent.localActor();
}
+ private void incrementSequence(final long delta) {
+ sequence += delta;
+ LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
+ }
+
final long nextSequence() {
- return sequence++;
+ final long ret = sequence++;
+ LOG.debug("Transaction {} allocated sequence {}", this, ret);
+ return ret;
}
final void delete(final YangInstanceIdentifier path) {
Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier());
}
+ final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
+ successfulRequests.add(Verify.verifyNotNull(req));
+ }
+
+ final void recordFinishedRequest() {
+ final Object last = successfulRequests.peekLast();
+ if (last instanceof IncrementSequence) {
+ ((IncrementSequence) last).incrementDelta();
+ } else {
+ successfulRequests.addLast(new IncrementSequence());
+ }
+ }
+
/**
* Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
* being sent to the backend.
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+ // This is a terminal request, hence we do not need to record it
+ LOG.debug("Transaction {} abort completed", this);
parent.completeTransaction(this);
});
}
ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
}
+ // This is a terminal request, hence we do not need to record it
+ LOG.debug("Transaction {} directCommit completed", this);
parent.completeTransaction(this);
});
return ret;
void canCommit(final VotingFuture<?> ret) {
checkSealed();
- sendRequest(Verify.verifyNotNull(commitRequest(true)), t -> {
+ final TransactionRequest<?> req = Verify.verifyNotNull(commitRequest(true));
+ sendRequest(req, t -> {
if (t instanceof TransactionCanCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
} else {
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+
+ recordSuccessfulRequest(req);
+ LOG.debug("Transaction {} canCommit completed", this);
});
}
void preCommit(final VotingFuture<?> ret) {
checkSealed();
- sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
+ final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
+ localActor());
+ sendRequest(req, t -> {
if (t instanceof TransactionPreCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
} else {
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+
+ recordSuccessfulRequest(req);
+ LOG.debug("Transaction {} preCommit completed", this);
});
}
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+ LOG.debug("Transaction {} doCommit completed", this);
parent.completeTransaction(this);
});
}
- void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
+ final void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
this.successor = Preconditions.checkNotNull(successor);
+
+ for (Object obj : successfulRequests) {
+ if (obj instanceof TransactionRequest) {
+ LOG.debug("Forwarding request {} to successor {}", obj, successor);
+ successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
+ } else {
+ Verify.verify(obj instanceof IncrementSequence);
+ successor.incrementSequence(((IncrementSequence) obj).getDelta());
+ }
+ }
+ LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
+ successfulRequests.clear();
}
/**