Fix a generic array warning
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 57c5b1de11fca6ad0496f9c4a3b06eb873048fdf..45fa7727e4f125e11d73658c699ecbc779715ceb 100644 (file)
@@ -10,10 +10,13 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
+import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -22,18 +25,26 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
+import java.util.concurrent.TimeoutException;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
+import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry.State;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
+import scala.concurrent.duration.Duration;
 
 /**
  * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
@@ -53,6 +64,8 @@ class ShardCommitCoordinator {
 
     private final ShardDataTree dataTree;
 
+    private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
+
     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
     // since this should only be accessed on the shard's dispatcher.
     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
@@ -72,8 +85,11 @@ class ShardCommitCoordinator {
 
     private Runnable runOnPendingTransactionsComplete;
 
-    ShardCommitCoordinator(ShardDataTree dataTree,
-            long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
+
+    private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+
+    ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
+            String name) {
 
         this.queueCapacity = queueCapacity;
         this.log = log;
@@ -113,7 +129,7 @@ class ShardCommitCoordinator {
         } else {
             cohortCache.remove(cohortEntry.getTransactionID());
 
-            RuntimeException ex = new RuntimeException(
+            final RuntimeException ex = new RuntimeException(
                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
                                   " capacity %d has been reached.",
                                   name, cohortEntry.getTransactionID(), queueCapacity));
@@ -130,47 +146,30 @@ class ShardCommitCoordinator {
      * @param ready the ForwardedReadyTransaction message to process
      * @param sender the sender of the message
      * @param shard the transaction's shard actor
+     * @param schema
      */
-    void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
+    void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard,
+            SchemaContext schema) {
         log.debug("{}: Readying transaction {}, client version {}", name,
                 ready.getTransactionID(), ready.getTxnClientVersion());
 
-        ShardDataTreeCohort cohort = ready.getTransaction().ready();
-        CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort);
+        final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+        final CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, cohortRegistry, schema, ready.getTxnClientVersion());
         cohortCache.put(ready.getTransactionID(), cohortEntry);
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
             return;
         }
 
-        if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
-            // Return our actor path as we'll handle the three phase commit except if the Tx client
-            // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
-            // In that case, the subsequent 3-phase commit messages won't contain the transactionId so to
-            // maintain backwards compatibility, we create a separate cohort actor to provide the compatible behavior.
-            ActorRef replyActorPath = shard.self();
-            if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
-                log.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", name);
-                replyActorPath = shard.getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
-                        ready.getTransactionID()));
-            }
-
-            ReadyTransactionReply readyTransactionReply =
-                    new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
-                            ready.getTxnClientVersion());
-            sender.tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
-                readyTransactionReply, shard.self());
+        if(ready.isDoImmediateCommit()) {
+            cohortEntry.setDoImmediateCommit(true);
+            cohortEntry.setReplySender(sender);
+            cohortEntry.setShard(shard);
+            handleCanCommit(cohortEntry);
         } else {
-            if(ready.isDoImmediateCommit()) {
-                cohortEntry.setDoImmediateCommit(true);
-                cohortEntry.setReplySender(sender);
-                cohortEntry.setShard(shard);
-                handleCanCommit(cohortEntry);
-            } else {
-                // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
-                // front-end so send back a ReadyTransactionReply with our actor path.
-                sender.tell(readyTransactionReply(shard), shard.self());
-            }
+            // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
+            // front-end so send back a ReadyTransactionReply with our actor path.
+            sender.tell(readyTransactionReply(shard), shard.self());
         }
     }
 
@@ -184,12 +183,12 @@ class ShardCommitCoordinator {
      * @param sender the sender of the message
      * @param shard the transaction's shard actor
      */
-    void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
+    void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard, SchemaContext schema) {
         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
-                    dataTree.newReadWriteTransaction(batched.getTransactionID(),
-                        batched.getTransactionChainID()));
+                    dataTree.newReadWriteTransaction(batched.getTransactionID(), batched.getTransactionChainID()),
+                    cohortRegistry, schema,  batched.getVersion());
             cohortCache.put(batched.getTransactionID(), cohortEntry);
         }
 
@@ -238,16 +237,19 @@ class ShardCommitCoordinator {
 
     /**
      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
-     * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
+     * been prepared beforehand by the sender and we just need to drive them through into the
+     * dataTree.
      *
      * @param message the ReadyLocalTransaction message to process
      * @param sender the sender of the message
      * @param shard the transaction's shard actor
      */
-    void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
+    void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard,
+            SchemaContext schema) {
         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
                 message.getTransactionID());
-        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
+        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort, cohortRegistry, schema,
+                DataStoreVersions.CURRENT_VERSION);
         cohortCache.put(message.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
@@ -266,6 +268,36 @@ class ShardCommitCoordinator {
         }
     }
 
+    Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
+            final int maxModificationsPerBatch) {
+        CohortEntry cohortEntry = getAndRemoveCohortEntry(from.getTransactionID());
+        if(cohortEntry == null || cohortEntry.getTransaction() == null) {
+            return Collections.singletonList(from);
+        }
+
+        cohortEntry.applyModifications(from.getModifications());
+
+        final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+        cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
+            @Override
+            protected BatchedModifications getModifications() {
+                if(newModifications.isEmpty() ||
+                        newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+                    newModifications.add(new BatchedModifications(from.getTransactionID(),
+                            from.getVersion(), from.getTransactionChainID()));
+                }
+
+                return newModifications.getLast();
+            }
+        });
+
+        BatchedModifications last = newModifications.getLast();
+        last.setDoCommitOnReady(from.isDoCommitOnReady());
+        last.setReady(from.isReady());
+        last.setTotalMessagesSent(newModifications.size());
+        return newModifications;
+    }
+
     private void handleCanCommit(CohortEntry cohortEntry) {
         String transactionID = cohortEntry.getTransactionID();
 
@@ -291,8 +323,9 @@ class ShardCommitCoordinator {
             doCanCommit(currentCohortEntry);
         } else {
             if(log.isDebugEnabled()) {
-                log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
-                        name, queuedCohortEntries.peek().getTransactionID(), transactionID);
+                log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
+                        queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
+                                transactionID);
             }
         }
     }
@@ -340,8 +373,9 @@ class ShardCommitCoordinator {
                 }
             } else {
                 cohortEntry.getReplySender().tell(
-                        canCommit ? CanCommitTransactionReply.YES.toSerializable() :
-                            CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
+                        canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
+                            CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
+                        cohortEntry.getShard().self());
             }
         } catch (Exception e) {
             log.debug("{}: An exception occurred during canCommit", name, e);
@@ -440,7 +474,7 @@ class ShardCommitCoordinator {
             shard.getShardMBean().incrementAbortTransactionsCount();
 
             if(sender != null) {
-                sender.tell(new AbortTransactionReply().toSerializable(), self);
+                sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
             }
         } catch (Exception e) {
             log.error("{}: An exception happened during abort", name, e);
@@ -470,21 +504,81 @@ class ShardCommitCoordinator {
             return;
         }
 
+        List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
+
+        log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
+
+        for(CohortEntry cohortEntry: cohortEntries) {
+            if(cohortEntry.getReplySender() != null) {
+                cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+            }
+        }
+    }
+
+    private List<CohortEntry> getAndClearPendingCohortEntries() {
         List<CohortEntry> cohortEntries = new ArrayList<>();
 
         if(currentCohortEntry != null) {
             cohortEntries.add(currentCohortEntry);
+            cohortCache.remove(currentCohortEntry.getTransactionID());
             currentCohortEntry = null;
         }
 
-        cohortEntries.addAll(queuedCohortEntries);
+        for(CohortEntry cohortEntry: queuedCohortEntries) {
+            cohortEntries.add(cohortEntry);
+            cohortCache.remove(cohortEntry.getTransactionID());
+        }
+
         queuedCohortEntries.clear();
+        return cohortEntries;
+    }
 
+    Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
+        if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        Collection<Object> messages = new ArrayList<>();
+        List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
         for(CohortEntry cohortEntry: cohortEntries) {
-            if(cohortEntry.getReplySender() != null) {
-                cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+            if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
+                continue;
+            }
+
+            final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+            cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
+                @Override
+                protected BatchedModifications getModifications() {
+                    if(newModifications.isEmpty() ||
+                            newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+                        newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
+                                cohortEntry.getClientVersion(), ""));
+        }
+
+                    return newModifications.getLast();
+                }
+            });
+
+            if(!newModifications.isEmpty()) {
+                BatchedModifications last = newModifications.getLast();
+                last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit());
+                last.setReady(true);
+                last.setTotalMessagesSent(newModifications.size());
+                messages.addAll(newModifications);
+
+                if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.CAN_COMMITTED) {
+                    messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+                            cohortEntry.getClientVersion()));
+                }
+
+                if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == State.PRE_COMMITTED) {
+                    messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+                            cohortEntry.getClientVersion()));
+                }
             }
         }
+
+        return messages;
     }
 
     /**
@@ -542,9 +636,9 @@ class ShardCommitCoordinator {
     private void maybeProcessNextCohortEntry() {
         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
         // clean out expired entries.
-        Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
+        final Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
         while(iter.hasNext()) {
-            CohortEntry next = iter.next();
+            final CohortEntry next = iter.next();
             if(next.isReadyToCommit()) {
                 if(currentCohortEntry == null) {
                     if(log.isDebugEnabled()) {
@@ -595,7 +689,19 @@ class ShardCommitCoordinator {
         this.cohortDecorator = cohortDecorator;
     }
 
+   void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
+        cohortRegistry.process(sender, message);
+    }
+
     static class CohortEntry {
+        enum State {
+            PENDING,
+            CAN_COMMITTED,
+            PRE_COMMITTED,
+            COMMITTED,
+            ABORTED
+    }
+
         private final String transactionID;
         private ShardDataTreeCohort cohort;
         private final ReadWriteShardDataTreeTransaction transaction;
@@ -605,17 +711,25 @@ class ShardCommitCoordinator {
         private boolean doImmediateCommit;
         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
         private int totalBatchedModificationsReceived;
-        private boolean aborted;
+        private State state = State.PENDING;
+        private final short clientVersion;
+        private final CompositeDataTreeCohort userCohorts;
 
-        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
+        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction,
+                DataTreeCohortActorRegistry cohortRegistry, SchemaContext schema, short clientVersion) {
             this.transaction = Preconditions.checkNotNull(transaction);
             this.transactionID = transactionID;
+            this.clientVersion = clientVersion;
+            this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
         }
 
-        CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
+        CohortEntry(String transactionID, ShardDataTreeCohort cohort, DataTreeCohortActorRegistry cohortRegistry,
+                SchemaContext schema, short clientVersion) {
             this.transactionID = transactionID;
             this.cohort = cohort;
             this.transaction = null;
+            this.clientVersion = clientVersion;
+            this.userCohorts = new CompositeDataTreeCohort(cohortRegistry, transactionID, schema, COMMIT_STEP_TIMEOUT);
         }
 
         void updateLastAccessTime() {
@@ -627,10 +741,26 @@ class ShardCommitCoordinator {
             return transactionID;
         }
 
+        short getClientVersion() {
+            return clientVersion;
+        }
+
+        State getState() {
+            return state;
+        }
+
         DataTreeCandidate getCandidate() {
             return cohort.getCandidate();
         }
 
+        DataTreeModification getDataTreeModification() {
+            return cohort.getDataTreeModification();
+        }
+
+        ReadWriteShardDataTreeTransaction getTransaction() {
+            return transaction;
+        }
+
         int getTotalBatchedModificationsReceived() {
             return totalBatchedModificationsReceived;
         }
@@ -654,6 +784,8 @@ class ShardCommitCoordinator {
         }
 
         boolean canCommit() throws InterruptedException, ExecutionException {
+            state = State.CAN_COMMITTED;
+
             // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
             // about possibly accessing our state on a different thread outside of our dispatcher.
             // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
@@ -662,17 +794,25 @@ class ShardCommitCoordinator {
             return cohort.canCommit().get();
         }
 
-        void preCommit() throws InterruptedException, ExecutionException {
+
+
+        void preCommit() throws InterruptedException, ExecutionException, TimeoutException {
+            state = State.PRE_COMMITTED;
             cohort.preCommit().get();
+            userCohorts.canCommit(cohort.getCandidate());
+            userCohorts.preCommit();
         }
 
-        void commit() throws InterruptedException, ExecutionException {
+        void commit() throws InterruptedException, ExecutionException, TimeoutException {
+            state = State.COMMITTED;
             cohort.commit().get();
+            userCohorts.commit();
         }
 
-        void abort() throws InterruptedException, ExecutionException {
-            aborted = true;
+        void abort() throws InterruptedException, ExecutionException, TimeoutException {
+            state = State.ABORTED;
             cohort.abort().get();
+            userCohorts.abort();
         }
 
         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
@@ -722,12 +862,12 @@ class ShardCommitCoordinator {
 
 
         boolean isAborted() {
-            return aborted;
+            return state == State.ABORTED;
         }
 
         @Override
         public String toString() {
-            StringBuilder builder = new StringBuilder();
+            final StringBuilder builder = new StringBuilder();
             builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
                     .append(doImmediateCommit).append("]");
             return builder.toString();