Pass no op callback instead of null during replay
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransaction.java
index 22be14036114b3e497b31570337b0cfd01b3b6b1..27f9b47cd9211386c9d43bb080aaea4b78cc3c0d 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import akka.actor.ActorRef;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -34,6 +35,7 @@ import org.opendaylight.controller.cluster.access.commands.TransactionCommitSucc
 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
@@ -238,6 +240,16 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         // Transition user-visible state first
         final boolean success = SEALED_UPDATER.compareAndSet(this, 0, 1);
         Preconditions.checkState(success, "Proxy %s was already sealed", getIdentifier());
+        internalSeal();
+    }
+
+    final void ensureSealed() {
+        if (SEALED_UPDATER.compareAndSet(this, 0, 1)) {
+            internalSeal();
+        }
+    }
+
+    private void internalSeal() {
         doSeal();
         parent.onTransactionSealed(this);
 
@@ -250,7 +262,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
             // Propagate state and seal the successor.
             flushState(successor);
-            successor.seal();
+            successor.ensureSealed();
         }
     }
 
@@ -311,7 +323,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
             // This is a terminal request, hence we do not need to record it
             LOG.debug("Transaction {} abort completed", this);
-            parent.completeTransaction(this);
+            purge();
         });
     }
 
@@ -344,7 +356,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
                     // This is a terminal request, hence we do not need to record it
                     LOG.debug("Transaction {} directCommit completed", this);
-                    parent.completeTransaction(this);
+                    purge();
                 });
 
                 return ret;
@@ -404,11 +416,26 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
             }
 
-            recordSuccessfulRequest(req);
-            LOG.debug("Transaction {} preCommit completed", this);
+            onPreCommitComplete(req);
         });
     }
 
+    private void onPreCommitComplete(final TransactionRequest<?> req) {
+        /*
+         * The backend has agreed that the transaction has entered PRE_COMMIT phase, meaning it will be committed
+         * to storage after the timeout completes.
+         *
+         * All state has been replicated to the backend, hence we do not need to keep it around. Retain only
+         * the precommit request, so we know which request to use for resync.
+         */
+        LOG.debug("Transaction {} preCommit completed, clearing successfulRequests", this);
+        successfulRequests.clear();
+
+        // TODO: this works, but can contain some useless state (like batched operations). Create an empty
+        //       equivalent of this request and store that.
+        recordSuccessfulRequest(req);
+    }
+
     final void doCommit(final VotingFuture<?> ret) {
         checkReadWrite();
         checkSealed();
@@ -423,6 +450,16 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             }
 
             LOG.debug("Transaction {} doCommit completed", this);
+            purge();
+        });
+    }
+
+    void purge() {
+        successfulRequests.clear();
+
+        final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+        sendRequest(req, t -> {
+            LOG.debug("Transaction {} purge completed", this);
             parent.completeTransaction(this);
         });
     }
@@ -454,7 +491,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         for (Object obj : successfulRequests) {
             if (obj instanceof TransactionRequest) {
                 LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
-                successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
+                successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, response -> { });
             } else {
                 Verify.verify(obj instanceof IncrementSequence);
                 successor.incrementSequence(((IncrementSequence) obj).getDelta());
@@ -486,7 +523,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         if (SEALED.equals(prevState)) {
             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
             flushState(successor);
-            successor.seal();
+            successor.ensureSealed();
         }
     }
 
@@ -520,16 +557,15 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
     abstract boolean isSnapshotOnly();
 
-    abstract void doDelete(final YangInstanceIdentifier path);
+    abstract void doDelete(YangInstanceIdentifier path);
 
-    abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
+    abstract void doMerge(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
-    abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
+    abstract void doWrite(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
-    abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
+    abstract CheckedFuture<Boolean, ReadFailedException> doExists(YangInstanceIdentifier path);
 
-    abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(
-            final YangInstanceIdentifier path);
+    abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
 
     abstract void doSeal();
 
@@ -565,4 +601,9 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      */
     abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
             Consumer<Response<?, ?>> callback);
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString();
+    }
 }