BUG-5280: fix transaction seal atomicity 31/48831/2
authorRobert Varga <rovarga@cisco.com>
Wed, 30 Nov 2016 14:07:58 +0000 (15:07 +0100)
committerRobert Varga <rovarga@cisco.com>
Wed, 30 Nov 2016 15:22:01 +0000 (16:22 +0100)
AbstractProxyTransaction.seal() indicates that the user is done
with the transaction. This transition needs to be atomically
propagated to successors on reconnect, such that the user will
always observe sealed proxies. More importantly this state
is propagated to parent ProxyHistory, where it drives the state
machine in ClientProxyHistory -- and failing to mark the successor
as sealed will wreck that.

Unfortunately an AbstractProxyTransaction does not forward all
of the state on seal(), but rather when the resulting commit
cohort initiates commit -- which means we have to perform three-way
synchronization between seal()/(can|direct)Commit/finishReconnect,
to ensure we flush state towards the backend exactly once.

To do that, we guard the methods involved with locking for split
them into fast/slow paths and add an explicit flushState() method
by which subclasses forward their current unsent state to their
successor. This solution is correct but a bit heavy-handed, so it
will be further optimized in a follow-up patch.

Change-Id: Id5f156dc18faef5b9184c3e2e3d24f7af1b18841
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java

index 803908d8c603848683d55d74d897c65bd9cd9931..7f5bec1ff6c25dd7dd0650217e327e96a82c9e5c 100644 (file)
@@ -10,15 +10,19 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 import akka.actor.ActorRef;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import akka.actor.ActorRef;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.base.Verify;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import com.google.common.base.Verify;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayDeque;
 import java.util.Deque;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Consumer;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import java.util.function.Consumer;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
@@ -52,6 +56,11 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
  * @author Robert Varga
  */
 abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
+    /**
+     * Marker object used instead of read-type of requests, which are satisfied only once. This has a lower footprint
+     * and allows compressing multiple requests into a single entry.
+     */
+    @NotThreadSafe
     private static final class IncrementSequence {
         private long delta = 1;
 
     private static final class IncrementSequence {
         private long delta = 1;
 
@@ -64,14 +73,49 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         }
     }
 
         }
     }
 
+    private enum SealState {
+        /**
+         * The user has not sealed the transaction yet.
+         */
+        OPEN,
+        /**
+         * The user has sealed the transaction, but has not issued a canCommit().
+         */
+        SEALED,
+        /**
+         * The user has sealed the transaction and has issued a canCommit().
+         */
+        FLUSHED,
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
 
     private final Deque<Object> successfulRequests = new ArrayDeque<>();
     private final ProxyHistory parent;
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
 
     private final Deque<Object> successfulRequests = new ArrayDeque<>();
     private final ProxyHistory parent;
 
+    /*
+     * Atomic state-keeping is required to synchronize the process of propagating completed transaction state towards
+     * the backend -- which may include a successor.
+     *
+     * Successor, unlike {@link AbstractProxyTransaction#seal()} is triggered from the client actor thread, which means
+     * the successor placement needs to be atomic with regard to the application thread.
+     *
+     * In the common case, the application thread performs performs the seal operations and then "immediately" sends
+     * the corresponding message. The uncommon case is when the seal and send operations race with a connect completion
+     * or timeout, when a successor is injected.
+     *
+     * This leaves the problem of needing to completely transferring state just after all queued messages are replayed
+     * after a successor was injected, so that it can be properly sealed if we are racing.
+     */
+    private volatile SealState sealed = SealState.OPEN;
+    @GuardedBy("this")
     private AbstractProxyTransaction successor;
     private AbstractProxyTransaction successor;
+    @GuardedBy("this")
+    private CountDownLatch successorLatch;
+
+    // Accessed from user thread only, which may not access this object concurrently
     private long sequence;
     private long sequence;
-    private boolean sealed;
+
 
     AbstractProxyTransaction(final ProxyHistory parent) {
         this.parent = Preconditions.checkNotNull(parent);
 
     AbstractProxyTransaction(final ProxyHistory parent) {
         this.parent = Preconditions.checkNotNull(parent);
@@ -126,18 +170,50 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * Seal this transaction before it is either committed or aborted.
      */
     final void seal() {
      * Seal this transaction before it is either committed or aborted.
      */
     final void seal() {
-        checkNotSealed();
-        doSeal();
-        sealed = true;
-        parent.onTransactionSealed(this);
+        final CountDownLatch localLatch;
+
+        synchronized (this) {
+            checkNotSealed();
+            doSeal();
+
+            // Fast path: no successor
+            if (successor == null) {
+                sealed = SealState.SEALED;
+                parent.onTransactionSealed(this);
+                return;
+            }
+
+            localLatch = successorLatch;
+        }
+
+        // Slow path: wait for the latch
+        LOG.debug("{} waiting on successor latch", getIdentifier());
+        try {
+            localLatch.await();
+        } catch (InterruptedException e) {
+            LOG.warn("{} interrupted while waiting for latch", getIdentifier());
+            throw Throwables.propagate(e);
+        }
+
+        synchronized (this) {
+            LOG.debug("{} reacquired lock", getIdentifier());
+
+            flushState(successor);
+            successor.seal();
+
+            sealed = SealState.FLUSHED;
+            parent.onTransactionSealed(this);
+        }
     }
 
     private void checkNotSealed() {
     }
 
     private void checkNotSealed() {
-        Preconditions.checkState(!sealed, "Transaction %s has already been sealed", getIdentifier());
+        Preconditions.checkState(sealed == SealState.OPEN, "Transaction %s has already been sealed", getIdentifier());
     }
 
     }
 
-    private void checkSealed() {
-        Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier());
+    private SealState checkSealed() {
+        final SealState local = sealed;
+        Preconditions.checkState(local != SealState.OPEN, "Transaction %s has not been sealed yet", getIdentifier());
+        return local;
     }
 
     final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
     }
 
     final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
@@ -192,45 +268,109 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @return Future completion
      */
     final ListenableFuture<Boolean> directCommit() {
      * @return Future completion
      */
     final ListenableFuture<Boolean> directCommit() {
-        checkSealed();
-
-        final SettableFuture<Boolean> ret = SettableFuture.create();
-        sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
-            if (t instanceof TransactionCommitSuccess) {
-                ret.set(Boolean.TRUE);
-            } else if (t instanceof RequestFailure) {
-                ret.setException(((RequestFailure<?, ?>) t).getCause());
-            } else {
-                ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
+        final CountDownLatch localLatch;
+
+        synchronized (this) {
+            final SealState local = checkSealed();
+
+            // Fast path: no successor asserted
+            if (successor == null) {
+                Verify.verify(local == SealState.SEALED);
+
+                final SettableFuture<Boolean> ret = SettableFuture.create();
+                sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
+                    if (t instanceof TransactionCommitSuccess) {
+                        ret.set(Boolean.TRUE);
+                    } else if (t instanceof RequestFailure) {
+                        ret.setException(((RequestFailure<?, ?>) t).getCause());
+                    } else {
+                        ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
+                    }
+
+                    // This is a terminal request, hence we do not need to record it
+                    LOG.debug("Transaction {} directCommit completed", this);
+                    parent.completeTransaction(this);
+                });
+
+                sealed = SealState.FLUSHED;
+                return ret;
             }
 
             }
 
-            // This is a terminal request, hence we do not need to record it
-            LOG.debug("Transaction {} directCommit completed", this);
-            parent.completeTransaction(this);
-        });
-        return ret;
+            // We have a successor, take its latch
+            localLatch = successorLatch;
+        }
+
+        // Slow path: we need to wait for the successor to completely propagate
+        LOG.debug("{} waiting on successor latch", getIdentifier());
+        try {
+            localLatch.await();
+        } catch (InterruptedException e) {
+            LOG.warn("{} interrupted while waiting for latch", getIdentifier());
+            throw Throwables.propagate(e);
+        }
+
+        synchronized (this) {
+            LOG.debug("{} reacquired lock", getIdentifier());
+
+            final SealState local = sealed;
+            Verify.verify(local == SealState.FLUSHED);
+
+            return successor.directCommit();
+        }
     }
 
     }
 
+    final void canCommit(final VotingFuture<?> ret) {
+        final CountDownLatch localLatch;
 
 
-    void canCommit(final VotingFuture<?> ret) {
-        checkSealed();
+        synchronized (this) {
+            final SealState local = checkSealed();
 
 
-        final TransactionRequest<?> req = Verify.verifyNotNull(commitRequest(true));
-        sendRequest(req, t -> {
-            if (t instanceof TransactionCanCommitSuccess) {
-                ret.voteYes();
-            } else if (t instanceof RequestFailure) {
-                ret.voteNo(((RequestFailure<?, ?>) t).getCause());
-            } else {
-                ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+            // Fast path: no successor asserted
+            if (successor == null) {
+                Verify.verify(local == SealState.SEALED);
+
+                final TransactionRequest<?> req = Verify.verifyNotNull(commitRequest(true));
+                sendRequest(req, t -> {
+                    if (t instanceof TransactionCanCommitSuccess) {
+                        ret.voteYes();
+                    } else if (t instanceof RequestFailure) {
+                        ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+                    } else {
+                        ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
+                    }
+
+                    recordSuccessfulRequest(req);
+                    LOG.debug("Transaction {} canCommit completed", this);
+                });
+
+                sealed = SealState.FLUSHED;
+                return;
             }
 
             }
 
-            recordSuccessfulRequest(req);
-            LOG.debug("Transaction {} canCommit completed", this);
-        });
+            // We have a successor, take its latch
+            localLatch = successorLatch;
+        }
+
+        // Slow path: we need to wait for the successor to completely propagate
+        LOG.debug("{} waiting on successor latch", getIdentifier());
+        try {
+            localLatch.await();
+        } catch (InterruptedException e) {
+            LOG.warn("{} interrupted while waiting for latch", getIdentifier());
+            throw Throwables.propagate(e);
+        }
+
+        synchronized (this) {
+            LOG.debug("{} reacquired lock", getIdentifier());
+
+            final SealState local = sealed;
+            Verify.verify(local == SealState.FLUSHED);
+
+            successor.canCommit(ret);
+        }
     }
 
     }
 
-    void preCommit(final VotingFuture<?> ret) {
+    final void preCommit(final VotingFuture<?> ret) {
         checkSealed();
 
         final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
         checkSealed();
 
         final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
@@ -266,7 +406,8 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         });
     }
 
         });
     }
 
-    final void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
+    final synchronized void startReconnect(final AbstractProxyTransaction successor) {
+        Preconditions.checkState(this.successor == null);
         this.successor = Preconditions.checkNotNull(successor);
 
         for (Object obj : successfulRequests) {
         this.successor = Preconditions.checkNotNull(successor);
 
         for (Object obj : successfulRequests) {
@@ -280,6 +421,29 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         }
         LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
         successfulRequests.clear();
         }
         LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
         successfulRequests.clear();
+
+        /*
+         * Before releasing the lock we need to make sure that a call to seal() blocks until we have completed
+         * finishConnect().
+         */
+        successorLatch = new CountDownLatch(1);
+    }
+
+    final synchronized void finishReconnect() {
+        Preconditions.checkState(successorLatch != null);
+
+        if (sealed == SealState.SEALED) {
+            /*
+             * If this proxy is in the 'sealed, have not sent canCommit' state. If so, we need to forward current
+             * leftover state to the successor now.
+             */
+            flushState(successor);
+            successor.seal();
+            sealed = SealState.FLUSHED;
+        }
+
+        // All done, release the latch, unblocking seal() and canCommit()
+        successorLatch.countDown();
     }
 
     /**
     }
 
     /**
@@ -290,8 +454,8 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @param callback Original callback
      * @throws RequestException when the request is unhandled by the successor
      */
      * @param callback Original callback
      * @throws RequestException when the request is unhandled by the successor
      */
-    final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback)
-            throws RequestException {
+    final synchronized void replayRequest(final TransactionRequest<?> request,
+            final Consumer<Response<?, ?>> callback) {
         Preconditions.checkState(successor != null, "%s does not have a successor set", this);
 
         if (successor instanceof LocalProxyTransaction) {
         Preconditions.checkState(successor != null, "%s does not have a successor set", this);
 
         if (successor instanceof LocalProxyTransaction) {
@@ -318,6 +482,9 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
     abstract void doAbort();
 
 
     abstract void doAbort();
 
+    @GuardedBy("this")
+    abstract void flushState(AbstractProxyTransaction successor);
+
     abstract TransactionRequest<?> commitRequest(boolean coordinated);
 
     /**
     abstract TransactionRequest<?> commitRequest(boolean coordinated);
 
     /**
@@ -338,11 +505,11 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * Replay a request originating in this proxy to a successor remote proxy.
      */
     abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
      * Replay a request originating in this proxy to a successor remote proxy.
      */
     abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
-            Consumer<Response<?, ?>> callback) throws RequestException;
+            Consumer<Response<?, ?>> callback);
 
     /**
      * Replay a request originating in this proxy to a successor local proxy.
      */
     abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
 
     /**
      * Replay a request originating in this proxy to a successor local proxy.
      */
     abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
-            Consumer<Response<?, ?>> callback) throws RequestException;
+            Consumer<Response<?, ?>> callback);
 }
 }
index 61f83c2db1e8172c9374665bfe41eba3e2e13d3d..e9941179c7aa7c599435defda9b2f6a9165ff862 100644 (file)
@@ -31,7 +31,6 @@ import org.opendaylight.controller.cluster.access.commands.TransactionModificati
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor;
@@ -40,7 +39,6 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
@@ -69,7 +67,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
     private final TransactionIdentifier identifier;
 
     private CursorAwareDataTreeModification modification;
     private final TransactionIdentifier identifier;
 
     private CursorAwareDataTreeModification modification;
-    private CursorAwareDataTreeSnapshot sealedModification;
+    private CursorAwareDataTreeModification sealedModification;
 
     LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
         final CursorAwareDataTreeModification modification) {
 
     LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
         final CursorAwareDataTreeModification modification) {
@@ -137,6 +135,26 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
         sealedModification = modification;
     }
 
         sealedModification = modification;
     }
 
+    @Override
+    void flushState(final AbstractProxyTransaction successor) {
+        sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() {
+            @Override
+            public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+                successor.write(current().node(child), data);
+            }
+
+            @Override
+            public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+                successor.merge(current().node(child), data);
+            }
+
+            @Override
+            public void delete(final PathArgument child) {
+                successor.delete(current().node(child));
+            }
+        });
+    }
+
     DataTreeSnapshot getSnapshot() {
         Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", identifier);
         return sealedModification;
     DataTreeSnapshot getSnapshot() {
         Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", identifier);
         return sealedModification;
@@ -205,7 +223,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
 
     @Override
     void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
-            final Consumer<Response<?, ?>> callback) throws RequestException {
+            final Consumer<Response<?, ?>> callback) {
         if (request instanceof CommitLocalTransactionRequest) {
             final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
             final DataTreeModification mod = req.getModification();
         if (request instanceof CommitLocalTransactionRequest) {
             final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request;
             final DataTreeModification mod = req.getModification();
@@ -242,7 +260,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
 
     @Override
     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
-            final Consumer<Response<?, ?>> callback) throws RequestException {
+            final Consumer<Response<?, ?>> callback) {
         if (request instanceof AbortLocalTransactionRequest) {
             successor.sendAbort(request, callback);
         } else if (request instanceof CommitLocalTransactionRequest) {
         if (request instanceof AbortLocalTransactionRequest) {
             successor.sendAbort(request, callback);
         } else if (request instanceof CommitLocalTransactionRequest) {
index 5257c6c7f6299bcfd85e08f67711256dbe906c63..07fcbebad01a263d38a0bfabe839e8b02832ffe2 100644 (file)
@@ -200,7 +200,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
                 LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
                 LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
-                t.replaySuccessfulRequests(newProxy);
+                t.startReconnect(newProxy);
             }
         }
 
             }
         }
 
@@ -208,6 +208,11 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         @Override
         ProxyHistory finishReconnect() {
             final ProxyHistory ret = Verify.verifyNotNull(successor);
         @Override
         ProxyHistory finishReconnect() {
             final ProxyHistory ret = Verify.verifyNotNull(successor);
+
+            for (AbstractProxyTransaction t : proxies.values()) {
+                t.finishReconnect();
+            }
+
             LOG.debug("Finished reconnecting proxy history {}", this);
             lock.unlock();
             return ret;
             LOG.debug("Finished reconnecting proxy history {}", this);
             lock.unlock();
             return ret;
index 175483855f14009a0f86ecc7107c07870bbaa76c..783096b7bfe5b7064bd5e7e7303725e63c7fe08e 100644 (file)
@@ -32,7 +32,6 @@ import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitR
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
-import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
@@ -261,14 +260,22 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         // No-op
     }
 
         // No-op
     }
 
+    @Override
+    void flushState(final AbstractProxyTransaction successor) {
+        if (builderBusy) {
+            final ModifyTransactionRequest request = builder.build();
+            builderBusy = false;
+            successor.handleForwardedRemoteRequest(request, null);
+        }
+    }
+
     @Override
     void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
     @Override
     void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
-            final Consumer<Response<?, ?>> callback) throws RequestException {
+            final Consumer<Response<?, ?>> callback) {
         successor.handleForwardedRequest(request, callback);
     }
 
         successor.handleForwardedRequest(request, callback);
     }
 
-    private void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback)
-            throws RequestException {
+    private void handleForwardedRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         if (request instanceof ModifyTransactionRequest) {
             final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
 
         if (request instanceof ModifyTransactionRequest) {
             final ModifyTransactionRequest req = (ModifyTransactionRequest) request;
 
@@ -316,7 +323,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
 
     @Override
     void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest<?> request,
-            final Consumer<Response<?, ?>> callback) throws RequestException {
+            final Consumer<Response<?, ?>> callback) {
         successor.handleForwardedRemoteRequest(request, callback);
     }
 }
         successor.handleForwardedRemoteRequest(request, callback);
     }
 }