import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
// At this point the successor has completed transition and is possibly visible by the user thread, which is
// still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
// Propagate state and seal the successor.
- flushState(successor);
+ final java.util.Optional<ModifyTransactionRequest> optState = flushState();
+ if (optState.isPresent()) {
+ forwardToSuccessor(successor, optState.get(), null);
+ }
successor.predecessorSealed();
}
*/
if (SEALED.equals(prevState)) {
LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
- flushState(successor);
+ final long enqueuedTicks = parent.currentTime();
+ final java.util.Optional<ModifyTransactionRequest> optState = flushState();
+ if (optState.isPresent()) {
+ successor.handleReplayedRemoteRequest(optState.get(), null, enqueuedTicks);
+ }
if (successor.markSealed()) {
- successor.sealAndSend(Optional.of(parent.currentTime()));
+ successor.sealAndSend(Optional.of(enqueuedTicks));
}
}
}
abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
@GuardedBy("this")
- abstract void flushState(AbstractProxyTransaction successor);
+ abstract java.util.Optional<ModifyTransactionRequest> flushState();
abstract TransactionRequest<?> abortRequest();
} else if (request instanceof TransactionPurgeRequest) {
LOG.debug("Forwarding purge {} to successor {}", request, successor);
successor.enqueuePurge(callback);
+ } else if (request instanceof ModifyTransactionRequest) {
+ successor.handleForwardedRequest(request, callback);
} else {
throw new IllegalArgumentException("Unhandled request" + request);
}
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
+import java.util.Optional;
import java.util.function.Consumer;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
}
@Override
- void flushState(final AbstractProxyTransaction successor) {
+ Optional<ModifyTransactionRequest> flushState() {
// No-op
+ return Optional.empty();
}
@Override
import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequestBuilder;
import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
}
@Override
- void flushState(final AbstractProxyTransaction successor) {
+ Optional<ModifyTransactionRequest> flushState() {
+ final ModifyTransactionRequestBuilder b = new ModifyTransactionRequestBuilder(getIdentifier(), localActor());
+ b.setSequence(0);
+
sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
@Override
public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
- successor.write(current().node(child), data);
+ b.addModification(new TransactionWrite(current().node(child), data));
}
@Override
public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
- successor.merge(current().node(child), data);
+ b.addModification(new TransactionMerge(current().node(child), data));
}
@Override
public void delete(final PathArgument child) {
- successor.delete(current().node(child));
+ b.addModification(new TransactionDelete(current().node(child)));
}
});
+
+ return Optional.of(b.build());
}
DataTreeSnapshot getSnapshot() {
}
@Override
- void flushState(final AbstractProxyTransaction successor) {
- if (builderBusy) {
- final ModifyTransactionRequest request = builder.build();
- builderBusy = false;
- forwardToSuccessor(successor, request, null);
+ java.util.Optional<ModifyTransactionRequest> flushState() {
+ if (!builderBusy) {
+ return java.util.Optional.empty();
}
+
+ final ModifyTransactionRequest request = builder.build();
+ builderBusy = false;
+ return java.util.Optional.of(request);
}
@Override
successor.handleForwardedRequest(request, callback);
}
- private void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
if (request instanceof ModifyTransactionRequest) {
final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
final RemoteProxyTransaction successor = transactionTester.getTransaction();
doAnswer(LocalProxyTransactionTest::applyToCursorAnswer).when(modification).applyToCursor(any());
transaction.sealOnly();
- transaction.flushState(successor);
+ final TransactionRequest<?> request = transaction.flushState().get();
+ transaction.forwardToSuccessor(successor, request, null);
verify(modification).applyToCursor(any());
transactionTester.getTransaction().seal();
transactionTester.getTransaction().directCommit();