Pass no op callback instead of null during replay
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransaction.java
index 36f9a4bccbe07e762dd8f4778ae4809fc4a2487a..27f9b47cd9211386c9d43bb080aaea4b78cc3c0d 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import akka.actor.ActorRef;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -34,6 +35,7 @@ import org.opendaylight.controller.cluster.access.commands.TransactionCommitSucc
 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
@@ -199,16 +201,19 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void delete(final YangInstanceIdentifier path) {
+        checkReadWrite();
         checkNotSealed();
         doDelete(path);
     }
 
     final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkReadWrite();
         checkNotSealed();
         doMerge(path, data);
     }
 
     final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkReadWrite();
         checkNotSealed();
         doWrite(path, data);
     }
@@ -235,6 +240,16 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         // Transition user-visible state first
         final boolean success = SEALED_UPDATER.compareAndSet(this, 0, 1);
         Preconditions.checkState(success, "Proxy %s was already sealed", getIdentifier());
+        internalSeal();
+    }
+
+    final void ensureSealed() {
+        if (SEALED_UPDATER.compareAndSet(this, 0, 1)) {
+            internalSeal();
+        }
+    }
+
+    private void internalSeal() {
         doSeal();
         parent.onTransactionSealed(this);
 
@@ -247,7 +262,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
             // Propagate state and seal the successor.
             flushState(successor);
-            successor.seal();
+            successor.ensureSealed();
         }
     }
 
@@ -265,6 +280,12 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return (SuccessorState) local;
     }
 
+    private void checkReadWrite() {
+        if (isSnapshotOnly()) {
+            throw new UnsupportedOperationException("Transaction " + getIdentifier() + " is a read-only snapshot");
+        }
+    }
+
     final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
         successfulRequests.add(Verify.verifyNotNull(req));
     }
@@ -302,7 +323,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
             // This is a terminal request, hence we do not need to record it
             LOG.debug("Transaction {} abort completed", this);
-            parent.completeTransaction(this);
+            purge();
         });
     }
 
@@ -317,6 +338,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @return Future completion
      */
     final ListenableFuture<Boolean> directCommit() {
+        checkReadWrite();
         checkSealed();
 
         // Precludes startReconnect() from interfering with the fast path
@@ -334,7 +356,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
                     // This is a terminal request, hence we do not need to record it
                     LOG.debug("Transaction {} directCommit completed", this);
-                    parent.completeTransaction(this);
+                    purge();
                 });
 
                 return ret;
@@ -346,6 +368,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void canCommit(final VotingFuture<?> ret) {
+        checkReadWrite();
         checkSealed();
 
         // Precludes startReconnect() from interfering with the fast path
@@ -379,6 +402,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void preCommit(final VotingFuture<?> ret) {
+        checkReadWrite();
         checkSealed();
 
         final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
@@ -392,12 +416,28 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
             }
 
-            recordSuccessfulRequest(req);
-            LOG.debug("Transaction {} preCommit completed", this);
+            onPreCommitComplete(req);
         });
     }
 
+    private void onPreCommitComplete(final TransactionRequest<?> req) {
+        /*
+         * The backend has agreed that the transaction has entered PRE_COMMIT phase, meaning it will be committed
+         * to storage after the timeout completes.
+         *
+         * All state has been replicated to the backend, hence we do not need to keep it around. Retain only
+         * the precommit request, so we know which request to use for resync.
+         */
+        LOG.debug("Transaction {} preCommit completed, clearing successfulRequests", this);
+        successfulRequests.clear();
+
+        // TODO: this works, but can contain some useless state (like batched operations). Create an empty
+        //       equivalent of this request and store that.
+        recordSuccessfulRequest(req);
+    }
+
     final void doCommit(final VotingFuture<?> ret) {
+        checkReadWrite();
         checkSealed();
 
         sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
@@ -410,6 +450,16 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             }
 
             LOG.debug("Transaction {} doCommit completed", this);
+            purge();
+        });
+    }
+
+    void purge() {
+        successfulRequests.clear();
+
+        final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+        sendRequest(req, t -> {
+            LOG.debug("Transaction {} purge completed", this);
             parent.completeTransaction(this);
         });
     }
@@ -441,7 +491,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         for (Object obj : successfulRequests) {
             if (obj instanceof TransactionRequest) {
                 LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
-                successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
+                successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, response -> { });
             } else {
                 Verify.verify(obj instanceof IncrementSequence);
                 successor.incrementSequence(((IncrementSequence) obj).getDelta());
@@ -473,7 +523,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         if (SEALED.equals(prevState)) {
             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
             flushState(successor);
-            successor.seal();
+            successor.ensureSealed();
         }
     }
 
@@ -505,16 +555,17 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         }
     }
 
-    abstract void doDelete(final YangInstanceIdentifier path);
+    abstract boolean isSnapshotOnly();
 
-    abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
+    abstract void doDelete(YangInstanceIdentifier path);
 
-    abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
+    abstract void doMerge(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
-    abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
+    abstract void doWrite(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
-    abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(
-            final YangInstanceIdentifier path);
+    abstract CheckedFuture<Boolean, ReadFailedException> doExists(YangInstanceIdentifier path);
+
+    abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
 
     abstract void doSeal();
 
@@ -550,4 +601,9 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      */
     abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
             Consumer<Response<?, ?>> callback);
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString();
+    }
 }