BUG-8618: rework AbstractProxyTransaction.flushState()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransaction.java
index 07b89e09230949da6c4849b3fb5dc03d4c3c36d8..51f528150d22a717034cc4281618591645cca92f 100644 (file)
@@ -11,7 +11,6 @@ import akka.actor.ActorRef;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
 import com.google.common.base.Verify;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.CheckedFuture;
@@ -32,6 +31,7 @@ import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
 import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
 import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
+import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
@@ -138,7 +138,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                 latch.await();
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted while waiting for latch of {}", successor);
-                throw Throwables.propagate(e);
+                throw new RuntimeException(e);
             }
             return successor;
         }
@@ -351,7 +351,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         // At this point the successor has completed transition and is possibly visible by the user thread, which is
         // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
         // Propagate state and seal the successor.
-        flushState(successor);
+        final java.util.Optional<ModifyTransactionRequest> optState = flushState();
+        if (optState.isPresent()) {
+            forwardToSuccessor(successor, optState.get(), null);
+        }
         successor.predecessorSealed();
     }
 
@@ -364,7 +367,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void sealOnly() {
         parent.onTransactionSealed(this);
         final boolean success = STATE_UPDATER.compareAndSet(this, OPEN, SEALED);
-        Verify.verify(success, "Attempted to replay seal on {}", this);
+        Verify.verify(success, "Attempted to replay seal on %s", this);
     }
 
     /**
@@ -719,9 +722,13 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
          */
         if (SEALED.equals(prevState)) {
             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
-            flushState(successor);
+            final long enqueuedTicks = parent.currentTime();
+            final java.util.Optional<ModifyTransactionRequest> optState = flushState();
+            if (optState.isPresent()) {
+                successor.handleReplayedRemoteRequest(optState.get(), null, enqueuedTicks);
+            }
             if (successor.markSealed()) {
-                successor.sealAndSend(Optional.of(parent.currentTime()));
+                successor.sealAndSend(Optional.of(enqueuedTicks));
             }
         }
     }
@@ -795,7 +802,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
 
     @GuardedBy("this")
-    abstract void flushState(AbstractProxyTransaction successor);
+    abstract java.util.Optional<ModifyTransactionRequest> flushState();
 
     abstract TransactionRequest<?> abortRequest();