BUG-8372: fix AbstractProxyTransaction.replayMessages()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractProxyTransaction.java
index 36f9a4bccbe07e762dd8f4778ae4809fc4a2487a..83ba07b69ac8f6e3ed253d18e420315065069e4d 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import akka.actor.ActorRef;
+import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
@@ -27,6 +28,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
+import org.opendaylight.controller.cluster.access.commands.AbstractLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
@@ -34,6 +36,7 @@ import org.opendaylight.controller.cluster.access.commands.TransactionCommitSucc
 import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.Request;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
@@ -199,16 +202,19 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void delete(final YangInstanceIdentifier path) {
+        checkReadWrite();
         checkNotSealed();
         doDelete(path);
     }
 
     final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkReadWrite();
         checkNotSealed();
         doMerge(path, data);
     }
 
     final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        checkReadWrite();
         checkNotSealed();
         doWrite(path, data);
     }
@@ -235,6 +241,16 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         // Transition user-visible state first
         final boolean success = SEALED_UPDATER.compareAndSet(this, 0, 1);
         Preconditions.checkState(success, "Proxy %s was already sealed", getIdentifier());
+        internalSeal();
+    }
+
+    final void ensureSealed() {
+        if (SEALED_UPDATER.compareAndSet(this, 0, 1)) {
+            internalSeal();
+        }
+    }
+
+    private void internalSeal() {
         doSeal();
         parent.onTransactionSealed(this);
 
@@ -247,7 +263,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
             // Propagate state and seal the successor.
             flushState(successor);
-            successor.seal();
+            successor.ensureSealed();
         }
     }
 
@@ -265,6 +281,12 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return (SuccessorState) local;
     }
 
+    private void checkReadWrite() {
+        if (isSnapshotOnly()) {
+            throw new UnsupportedOperationException("Transaction " + getIdentifier() + " is a read-only snapshot");
+        }
+    }
+
     final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
         successfulRequests.add(Verify.verifyNotNull(req));
     }
@@ -295,14 +317,14 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             if (t instanceof TransactionAbortSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
-                ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+                ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
             } else {
                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
             }
 
             // This is a terminal request, hence we do not need to record it
             LOG.debug("Transaction {} abort completed", this);
-            parent.completeTransaction(this);
+            purge();
         });
     }
 
@@ -317,6 +339,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @return Future completion
      */
     final ListenableFuture<Boolean> directCommit() {
+        checkReadWrite();
         checkSealed();
 
         // Precludes startReconnect() from interfering with the fast path
@@ -327,14 +350,14 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                     if (t instanceof TransactionCommitSuccess) {
                         ret.set(Boolean.TRUE);
                     } else if (t instanceof RequestFailure) {
-                        ret.setException(((RequestFailure<?, ?>) t).getCause());
+                        ret.setException(((RequestFailure<?, ?>) t).getCause().unwrap());
                     } else {
                         ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
                     }
 
                     // This is a terminal request, hence we do not need to record it
                     LOG.debug("Transaction {} directCommit completed", this);
-                    parent.completeTransaction(this);
+                    purge();
                 });
 
                 return ret;
@@ -346,6 +369,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void canCommit(final VotingFuture<?> ret) {
+        checkReadWrite();
         checkSealed();
 
         // Precludes startReconnect() from interfering with the fast path
@@ -357,7 +381,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                     if (t instanceof TransactionCanCommitSuccess) {
                         ret.voteYes();
                     } else if (t instanceof RequestFailure) {
-                        ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+                        ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
                     } else {
                         ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
                     }
@@ -379,6 +403,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     }
 
     final void preCommit(final VotingFuture<?> ret) {
+        checkReadWrite();
         checkSealed();
 
         final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
@@ -387,29 +412,55 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             if (t instanceof TransactionPreCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
-                ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+                ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
             } else {
                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
             }
 
-            recordSuccessfulRequest(req);
-            LOG.debug("Transaction {} preCommit completed", this);
+            onPreCommitComplete(req);
         });
     }
 
+    private void onPreCommitComplete(final TransactionRequest<?> req) {
+        /*
+         * The backend has agreed that the transaction has entered PRE_COMMIT phase, meaning it will be committed
+         * to storage after the timeout completes.
+         *
+         * All state has been replicated to the backend, hence we do not need to keep it around. Retain only
+         * the precommit request, so we know which request to use for resync.
+         */
+        LOG.debug("Transaction {} preCommit completed, clearing successfulRequests", this);
+        successfulRequests.clear();
+
+        // TODO: this works, but can contain some useless state (like batched operations). Create an empty
+        //       equivalent of this request and store that.
+        recordSuccessfulRequest(req);
+    }
+
     final void doCommit(final VotingFuture<?> ret) {
+        checkReadWrite();
         checkSealed();
 
         sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
             if (t instanceof TransactionCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
-                ret.voteNo(((RequestFailure<?, ?>) t).getCause());
+                ret.voteNo(((RequestFailure<?, ?>) t).getCause().unwrap());
             } else {
                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
             }
 
             LOG.debug("Transaction {} doCommit completed", this);
+            purge();
+        });
+    }
+
+    void purge() {
+        successfulRequests.clear();
+
+        final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+        sendRequest(req, t -> {
+            LOG.debug("Transaction {} purge completed", this);
             parent.completeTransaction(this);
         });
     }
@@ -441,7 +492,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         for (Object obj : successfulRequests) {
             if (obj instanceof TransactionRequest) {
                 LOG.debug("Forwarding successful request {} to successor {}", obj, successor);
-                successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
+                successor.replay((TransactionRequest<?>) obj, response -> { });
             } else {
                 Verify.verify(obj instanceof IncrementSequence);
                 successor.incrementSequence(((IncrementSequence) obj).getDelta());
@@ -458,8 +509,8 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
 
             if (getIdentifier().equals(req.getTarget())) {
                 Verify.verify(req instanceof TransactionRequest, "Unhandled request %s", req);
-                LOG.debug("Forwarding queued request{} to successor {}", req, successor);
-                successor.handleForwardedRemoteRequest((TransactionRequest<?>) req, e.getCallback());
+                LOG.debug("Forwarding queued request {} to successor {}", req, successor);
+                successor.replay((TransactionRequest<?>) req, e.getCallback());
                 it.remove();
             }
         }
@@ -473,7 +524,25 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         if (SEALED.equals(prevState)) {
             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
             flushState(successor);
-            successor.seal();
+            successor.ensureSealed();
+        }
+    }
+
+    /**
+     * Invoked from {@link #replayMessages(AbstractProxyTransaction, Iterable)} to have successor adopt an in-flight
+     * request.
+     *
+     * <p>
+     * Note: this method is invoked by the predecessor on the successor.
+     *
+     * @param request Request which needs to be forwarded
+     * @param callback Callback to be invoked once the request completes
+     */
+    private void replay(TransactionRequest<?> request, Consumer<Response<?, ?>> callback) {
+        if (request instanceof AbstractLocalTransactionRequest) {
+            handleForwardedLocalRequest((AbstractLocalTransactionRequest<?>) request, callback);
+        } else {
+            handleForwardedRemoteRequest(request, callback);
         }
     }
 
@@ -493,7 +562,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @param request Request to be forwarded
      * @param callback Original callback
      */
-    final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+    final void forwardRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
         final AbstractProxyTransaction successor = getSuccessorState().getSuccessor();
 
         if (successor instanceof LocalProxyTransaction) {
@@ -505,16 +574,17 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         }
     }
 
-    abstract void doDelete(final YangInstanceIdentifier path);
+    abstract boolean isSnapshotOnly();
+
+    abstract void doDelete(YangInstanceIdentifier path);
 
-    abstract void doMerge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
+    abstract void doMerge(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
-    abstract void doWrite(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data);
+    abstract void doWrite(YangInstanceIdentifier path, NormalizedNode<?, ?> data);
 
-    abstract CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path);
+    abstract CheckedFuture<Boolean, ReadFailedException> doExists(YangInstanceIdentifier path);
 
-    abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(
-            final YangInstanceIdentifier path);
+    abstract CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(YangInstanceIdentifier path);
 
     abstract void doSeal();
 
@@ -526,9 +596,19 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     abstract TransactionRequest<?> commitRequest(boolean coordinated);
 
     /**
-     * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is
-     * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all
-     * operations are packaged in the message.
+     * Replay a request originating in this proxy to a successor remote proxy.
+     */
+    abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
+            Consumer<Response<?, ?>> callback);
+
+    /**
+     * Replay a request originating in this proxy to a successor local proxy.
+     */
+    abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
+            Consumer<Response<?, ?>> callback);
+
+    /**
+     * Invoked from {@link LocalProxyTransaction} when it replays its successful requests to its successor.
      *
      * <p>
      * Note: this method is invoked by the predecessor on the successor.
@@ -536,18 +616,23 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
      * @param request Request which needs to be forwarded
      * @param callback Callback to be invoked once the request completes
      */
-    abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
+    abstract void handleForwardedLocalRequest(AbstractLocalTransactionRequest<?> request,
             @Nullable Consumer<Response<?, ?>> callback);
 
     /**
-     * Replay a request originating in this proxy to a successor remote proxy.
+     * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor.
+     *
+     * <p>
+     * Note: this method is invoked by the predecessor on the successor.
+     *
+     * @param request Request which needs to be forwarded
+     * @param callback Callback to be invoked once the request completes
      */
-    abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
-            Consumer<Response<?, ?>> callback);
+    abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
+            @Nullable Consumer<Response<?, ?>> callback);
 
-    /**
-     * Replay a request originating in this proxy to a successor local proxy.
-     */
-    abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
-            Consumer<Response<?, ?>> callback);
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString();
+    }
 }