Instead of directly forwarding state use ModifyTransactionRequest
to encapsulate state and forward it separately to the successor.
This eliminates sendRequest() from replay path, ensuring the replay
thread is not blocked.
Change-Id: Ice86791d417b7487b9d3b1df06341dd028cde7f8
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit
c525e5f25b951daa28d0cbde237ba3040b68f99f)
import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
// At this point the successor has completed transition and is possibly visible by the user thread, which is
// still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
// Propagate state and seal the successor.
// At this point the successor has completed transition and is possibly visible by the user thread, which is
// still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
// Propagate state and seal the successor.
+ final java.util.Optional<ModifyTransactionRequest> optState = flushState();
+ if (optState.isPresent()) {
+ forwardToSuccessor(successor, optState.get(), null);
+ }
successor.predecessorSealed();
}
successor.predecessorSealed();
}
*/
if (SEALED.equals(prevState)) {
LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
*/
if (SEALED.equals(prevState)) {
LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
+ final long enqueuedTicks = parent.currentTime();
+ final java.util.Optional<ModifyTransactionRequest> optState = flushState();
+ if (optState.isPresent()) {
+ successor.handleReplayedRemoteRequest(optState.get(), null, enqueuedTicks);
+ }
if (successor.markSealed()) {
if (successor.markSealed()) {
- successor.sealAndSend(Optional.of(parent.currentTime()));
+ successor.sealAndSend(Optional.of(enqueuedTicks));
abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
@GuardedBy("this")
abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
@GuardedBy("this")
- abstract void flushState(AbstractProxyTransaction successor);
+ abstract java.util.Optional<ModifyTransactionRequest> flushState();
abstract TransactionRequest<?> abortRequest();
abstract TransactionRequest<?> abortRequest();
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
successor.enqueuePurge(callback);
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
successor.enqueuePurge(callback);
+ } else if (request instanceof ModifyTransactionRequest) {
+ successor.handleForwardedRequest(request, callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
+import java.util.Optional;
import java.util.function.Consumer;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import java.util.function.Consumer;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
- void flushState(final AbstractProxyTransaction successor) {
+ Optional<ModifyTransactionRequest> flushState() {
+ return Optional.empty();
import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
- void flushState(final AbstractProxyTransaction successor) {
+ Optional<ModifyTransactionRequest> flushState() {
+ final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(getIdentifier(), localActor());
+ b.setSequence(0);
+
sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
@Override
public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
@Override
public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
- successor.write(current().node(child), data);
+ b.addModification(new TransactionWrite(current().node(child), data));
}
@Override
public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
}
@Override
public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
- successor.merge(current().node(child), data);
+ b.addModification(new TransactionMerge(current().node(child), data));
}
@Override
public void delete(final PathArgument child) {
}
@Override
public void delete(final PathArgument child) {
- successor.delete(current().node(child));
+ b.addModification(new TransactionDelete(current().node(child)));
+
+ return Optional.of(b.build());
}
DataTreeSnapshot getSnapshot() {
}
DataTreeSnapshot getSnapshot() {
- void flushState(final AbstractProxyTransaction successor) {
- if (builderBusy) {
- final ModifyTransactionRequest request = builder.build();
- builderBusy = false;
- forwardToSuccessor(successor, request, null);
+ java.util.Optional<ModifyTransactionRequest> flushState() {
+ if (!builderBusy) {
+ return java.util.Optional.empty();
+
+ final ModifyTransactionRequest request = builder.build();
+ builderBusy = false;
+ return java.util.Optional.of(request);
successor.handleForwardedRequest(request, callback);
}
successor.handleForwardedRequest(request, callback);
}
- private void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
if (request instanceof ModifyTransactionRequest) {
final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
if (request instanceof ModifyTransactionRequest) {
final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
final RemoteProxyTransaction successor = transactionTester.getTransaction();
doAnswer(LocalProxyTransactionTest::applyToCursorAnswer).when(modification).applyToCursor(any());
transaction.sealOnly();
final RemoteProxyTransaction successor = transactionTester.getTransaction();
doAnswer(LocalProxyTransactionTest::applyToCursorAnswer).when(modification).applyToCursor(any());
transaction.sealOnly();
- transaction.flushState(successor);
+ final TransactionRequest<?> request = transaction.flushState().get();
+ transaction.forwardToSuccessor(successor, request, null);
verify(modification).applyToCursor(any());
transactionTester.getTransaction().seal();
transactionTester.getTransaction().directCommit();
verify(modification).applyToCursor(any());
transactionTester.getTransaction().seal();
transactionTester.getTransaction().directCommit();