Fix followerDistributedDataStore tear down
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 7e3cd02efbce2ebe6c31e600a494177bcf0a6cbf..946203b6b76aa5e2c4b4f94a849a9430f2d3fa06 100644 (file)
@@ -7,23 +7,23 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
 import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Deque;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
-import javax.annotation.Nonnull;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -38,7 +38,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe
 import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
 import org.slf4j.Logger;
 
 /**
@@ -46,6 +47,7 @@ import org.slf4j.Logger;
  *
  * @author Thomas Pantelis
  */
+@Deprecated(since = "9.0.0", forRemoval = true)
 final class ShardCommitCoordinator {
 
     // Interface hook for unit tests to replace or decorate the ShardDataTreeCohorts.
@@ -71,7 +73,7 @@ final class ShardCommitCoordinator {
     ShardCommitCoordinator(final ShardDataTree dataTree, final Logger log, final String name) {
         this.log = log;
         this.name = name;
-        this.dataTree = Preconditions.checkNotNull(dataTree);
+        this.dataTree = requireNonNull(dataTree);
     }
 
     int getCohortCacheSize() {
@@ -101,11 +103,11 @@ final class ShardCommitCoordinator {
     void handleForwardedReadyTransaction(final ForwardedReadyTransaction ready, final ActorRef sender,
             final Shard shard) {
         log.debug("{}: Readying transaction {}, client version {}", name,
-                ready.getTransactionID(), ready.getTxnClientVersion());
+                ready.getTransactionId(), ready.getTxnClientVersion());
 
-        final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+        final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
-        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
+        cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
 
         if (ready.isDoImmediateCommit()) {
             cohortEntry.setDoImmediateCommit(true);
@@ -128,41 +130,42 @@ final class ShardCommitCoordinator {
      * @param batched the BatchedModifications message to process
      * @param sender the sender of the message
      */
+    @SuppressFBWarnings(value = "THROWS_METHOD_THROWS_RUNTIMEEXCEPTION", justification = "Replay of captured failure")
     void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
-        CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
-        if (cohortEntry == null) {
-            cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionID()),
+        CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId());
+        if (cohortEntry == null || cohortEntry.isSealed()) {
+            cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()),
                 batched.getVersion());
-            cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
+            cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
         }
 
         if (log.isDebugEnabled()) {
             log.debug("{}: Applying {} batched modifications for Tx {}", name,
-                    batched.getModifications().size(), batched.getTransactionID());
+                    batched.getModifications().size(), batched.getTransactionId());
         }
 
         cohortEntry.applyModifications(batched.getModifications());
 
         if (batched.isReady()) {
             if (cohortEntry.getLastBatchedModificationsException() != null) {
-                cohortCache.remove(cohortEntry.getTransactionID());
+                cohortCache.remove(cohortEntry.getTransactionId());
                 throw cohortEntry.getLastBatchedModificationsException();
             }
 
             if (cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
-                cohortCache.remove(cohortEntry.getTransactionID());
+                cohortCache.remove(cohortEntry.getTransactionId());
                 throw new IllegalStateException(String.format(
                         "The total number of batched messages received %d does not match the number sent %d",
                         cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
             }
 
             if (log.isDebugEnabled()) {
-                log.debug("{}: Readying Tx {}, client version {}", name,
-                        batched.getTransactionID(), batched.getVersion());
+                log.debug("{}: Readying Tx {} of {} operations, client version {}", name,
+                        batched.getTransactionId(), cohortEntry.getTotalOperationsProcessed(), batched.getVersion());
             }
 
             cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
-            cohortEntry.ready(cohortDecorator);
+            cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator);
 
             if (batched.isDoCommitOnReady()) {
                 cohortEntry.setReplySender(sender);
@@ -186,13 +189,14 @@ final class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      */
     void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
-        final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionID(),
-            message.getModification());
+        final TransactionIdentifier txId = message.getTransactionId();
+        final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
+                message.getParticipatingShardNames());
         final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
-        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
+        cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
-        log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
+        log.debug("{}: Applying local modifications for Tx {}", name, txId);
 
         if (message.isDoCommitOnReady()) {
             cohortEntry.setReplySender(sender);
@@ -203,9 +207,10 @@ final class ShardCommitCoordinator {
         }
     }
 
+    @Deprecated(since = "9.0.0", forRemoval = true)
     Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
             final int maxModificationsPerBatch) {
-        CohortEntry cohortEntry = cohortCache.remove(from.getTransactionID());
+        CohortEntry cohortEntry = cohortCache.remove(from.getTransactionId());
         if (cohortEntry == null || cohortEntry.getTransaction() == null) {
             return Collections.singletonList(from);
         }
@@ -218,7 +223,7 @@ final class ShardCommitCoordinator {
             protected BatchedModifications getModifications() {
                 if (newModifications.isEmpty()
                         || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
-                    newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
+                    newModifications.add(new BatchedModifications(from.getTransactionId(), from.getVersion()));
                 }
 
                 return newModifications.getLast();
@@ -227,16 +232,18 @@ final class ShardCommitCoordinator {
 
         BatchedModifications last = newModifications.getLast();
         last.setDoCommitOnReady(from.isDoCommitOnReady());
-        last.setReady(from.isReady());
+        if (from.isReady()) {
+            last.setReady(from.getParticipatingShardNames());
+        }
         last.setTotalMessagesSent(newModifications.size());
         return newModifications;
     }
 
     private void handleCanCommit(final CohortEntry cohortEntry) {
-        cohortEntry.canCommit(new FutureCallback<Void>() {
+        cohortEntry.canCommit(new FutureCallback<>() {
             @Override
-            public void onSuccess(final Void result) {
-                log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionID());
+            public void onSuccess(final Empty result) {
+                log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionId());
 
                 if (cohortEntry.isDoImmediateCommit()) {
                     doCommit(cohortEntry);
@@ -249,10 +256,10 @@ final class ShardCommitCoordinator {
 
             @Override
             public void onFailure(final Throwable failure) {
-                log.debug("{}: An exception occurred during canCommit for {}: {}", name,
-                        cohortEntry.getTransactionID(), failure);
+                log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(),
+                    failure);
 
-                cohortCache.remove(cohortEntry.getTransactionID());
+                cohortCache.remove(cohortEntry.getTransactionId());
                 cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
             }
         });
@@ -274,7 +281,7 @@ final class ShardCommitCoordinator {
             // between canCommit and ready and the entry was expired from the cache or it was aborted.
             IllegalStateException ex = new IllegalStateException(
                     String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
-            log.error(ex.getMessage());
+            log.error("{}: Inconsistency during transaction {} canCommit", name, transactionID, ex);
             sender.tell(new Failure(ex), shard.self());
             return;
         }
@@ -285,8 +292,8 @@ final class ShardCommitCoordinator {
         handleCanCommit(cohortEntry);
     }
 
-    private void doCommit(final CohortEntry cohortEntry) {
-        log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionID());
+    void doCommit(final CohortEntry cohortEntry) {
+        log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId());
 
         // We perform the preCommit phase here atomically with the commit phase. This is an
         // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
@@ -301,35 +308,35 @@ final class ShardCommitCoordinator {
             @Override
             public void onFailure(final Throwable failure) {
                 log.error("{} An exception occurred while preCommitting transaction {}", name,
-                        cohortEntry.getTransactionID(), failure);
+                        cohortEntry.getTransactionId(), failure);
 
-                cohortCache.remove(cohortEntry.getTransactionID());
+                cohortCache.remove(cohortEntry.getTransactionId());
                 cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
             }
         });
     }
 
-    private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
-        log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
+    void finishCommit(final @NonNull ActorRef sender, final @NonNull CohortEntry cohortEntry) {
+        log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
 
         cohortEntry.commit(new FutureCallback<UnsignedLong>() {
             @Override
             public void onSuccess(final UnsignedLong result) {
-                final TransactionIdentifier txId = cohortEntry.getTransactionID();
+                final TransactionIdentifier txId = cohortEntry.getTransactionId();
                 log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
                     sender);
 
-                cohortCache.remove(cohortEntry.getTransactionID());
+                cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
                     cohortEntry.getShard().self());
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
-                        cohortEntry.getTransactionID(), failure);
+                final TransactionIdentifier txId = cohortEntry.getTransactionId();
+                log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
 
-                cohortCache.remove(cohortEntry.getTransactionID());
+                cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(new Failure(failure), cohortEntry.getShard().self());
             }
         });
@@ -349,7 +356,7 @@ final class ShardCommitCoordinator {
             // or it was aborted.
             IllegalStateException ex = new IllegalStateException(
                     String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
-            log.error(ex.getMessage());
+            log.error("{}: Inconsistency during transaction {} commit", name, transactionID, ex);
             sender.tell(new Failure(ex), shard.self());
             return;
         }
@@ -368,47 +375,44 @@ final class ShardCommitCoordinator {
         log.debug("{}: Aborting transaction {}", name, transactionID);
 
         final ActorRef self = shard.getSelf();
-        try {
-            cohortEntry.abort();
+        cohortEntry.abort(new FutureCallback<>() {
+            @Override
+            public void onSuccess(final Empty result) {
+                if (sender != null) {
+                    sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+                }
+            }
 
-            shard.getShardMBean().incrementAbortTransactionsCount();
+            @Override
+            public void onFailure(final Throwable failure) {
+                log.error("{}: An exception happened during abort", name, failure);
 
-            if (sender != null) {
-                sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
+                if (sender != null) {
+                    sender.tell(new Failure(failure), self);
+                }
             }
-        } catch (Exception e) {
-            log.error("{}: An exception happened during abort", name, e);
+        });
 
-            if (sender != null) {
-                sender.tell(new Failure(e), self);
-            }
-        }
+        shard.getShardMBean().incrementAbortTransactionsCount();
     }
 
     void checkForExpiredTransactions(final long timeout, final Shard shard) {
-        Iterator<CohortEntry> iter = cohortCache.values().iterator();
-        while (iter.hasNext()) {
-            CohortEntry cohortEntry = iter.next();
-            if (cohortEntry.isFailed()) {
-                iter.remove();
-            }
-        }
+        cohortCache.values().removeIf(CohortEntry::isFailed);
     }
 
     void abortPendingTransactions(final String reason, final Shard shard) {
-        final Failure failure = new Failure(new RuntimeException(reason));
-        Collection<ShardDataTreeCohort> pending = dataTree.getAndClearPendingTransactions();
+        final var failure = new Failure(new RuntimeException(reason));
+        final var pending = dataTree.getAndClearPendingTransactions();
 
         log.debug("{}: Aborting {} pending queued transactions", name, pending.size());
 
-        for (ShardDataTreeCohort cohort : pending) {
-            CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier());
-            if (cohortEntry == null) {
-                continue;
-            }
-
-            if (cohortEntry.getReplySender() != null) {
-                cohortEntry.getReplySender().tell(failure, shard.self());
+        for (var cohort : pending) {
+            final var cohortEntry = cohortCache.remove(cohort.transactionId());
+            if (cohortEntry != null) {
+                final var replySender = cohortEntry.getReplySender();
+                if (replySender != null) {
+                    replySender.tell(failure, shard.self());
+                }
             }
         }
 
@@ -416,36 +420,35 @@ final class ShardCommitCoordinator {
     }
 
     Collection<?> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
-        final Collection<VersionedExternalizableMessage> messages = new ArrayList<>();
-        for (ShardDataTreeCohort cohort : dataTree.getAndClearPendingTransactions()) {
-            CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier());
+        final var messages = new ArrayList<VersionedExternalizableMessage>();
+        for (var cohort : dataTree.getAndClearPendingTransactions()) {
+            final var cohortEntry = cohortCache.remove(cohort.transactionId());
             if (cohortEntry == null) {
                 continue;
             }
 
-            final Deque<BatchedModifications> newMessages = new ArrayDeque<>();
+            final var newMessages = new ArrayDeque<BatchedModifications>();
             cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
                 @Override
                 protected BatchedModifications getModifications() {
-                    final BatchedModifications lastBatch = newMessages.peekLast();
-
+                    final var lastBatch = newMessages.peekLast();
                     if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) {
                         return lastBatch;
                     }
 
                     // Allocate a new message
-                    final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionID(),
+                    final var ret = new BatchedModifications(cohortEntry.getTransactionId(),
                         cohortEntry.getClientVersion());
                     newMessages.add(ret);
                     return ret;
                 }
             });
 
-            final BatchedModifications last = newMessages.peekLast();
+            final var last = newMessages.peekLast();
             if (last != null) {
                 final boolean immediate = cohortEntry.isDoImmediateCommit();
                 last.setDoCommitOnReady(immediate);
-                last.setReady(true);
+                last.setReady(cohortEntry.getParticipatingShardNames());
                 last.setTotalMessagesSent(newMessages.size());
 
                 messages.addAll(newMessages);
@@ -454,12 +457,12 @@ final class ShardCommitCoordinator {
                     switch (cohort.getState()) {
                         case CAN_COMMIT_COMPLETE:
                         case CAN_COMMIT_PENDING:
-                            messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+                            messages.add(new CanCommitTransaction(cohortEntry.getTransactionId(),
                                 cohortEntry.getClientVersion()));
                             break;
                         case PRE_COMMIT_COMPLETE:
                         case PRE_COMMIT_PENDING:
-                            messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+                            messages.add(new CommitTransaction(cohortEntry.getTransactionId(),
                                 cohortEntry.getClientVersion()));
                             break;
                         default: