BUG-5280: refactor CohortEntry
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 76131e257b893a1671f74f01b1e508e2689ca731..eb0c04dbbd86eaaabde73326baf1b35086073ce1 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -23,18 +22,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.DataTreeCohortActorRegistry.CohortRegistryCommand;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
+import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.concepts.Identifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 
 /**
@@ -42,19 +43,21 @@ import org.slf4j.Logger;
  *
  * @author Thomas Pantelis
  */
-class ShardCommitCoordinator {
+final class ShardCommitCoordinator {
 
     // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     public interface CohortDecorator {
-        ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
+        ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
     }
 
-    private final Map<String, CohortEntry> cohortCache = new HashMap<>();
+    private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
 
     private CohortEntry currentCohortEntry;
 
     private final ShardDataTree dataTree;
 
+    private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
+
     // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
     // since this should only be accessed on the shard's dispatcher.
     private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
@@ -74,8 +77,8 @@ class ShardCommitCoordinator {
 
     private Runnable runOnPendingTransactionsComplete;
 
-    ShardCommitCoordinator(ShardDataTree dataTree,
-            long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log, String name) {
+    ShardCommitCoordinator(ShardDataTree dataTree, long cacheExpiryTimeoutInMillis, int queueCapacity, Logger log,
+            String name) {
 
         this.queueCapacity = queueCapacity;
         this.log = log;
@@ -115,7 +118,7 @@ class ShardCommitCoordinator {
         } else {
             cohortCache.remove(cohortEntry.getTransactionID());
 
-            RuntimeException ex = new RuntimeException(
+            final RuntimeException ex = new RuntimeException(
                     String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
                                   " capacity %d has been reached.",
                                   name, cohortEntry.getTransactionID(), queueCapacity));
@@ -132,14 +135,17 @@ class ShardCommitCoordinator {
      * @param ready the ForwardedReadyTransaction message to process
      * @param sender the sender of the message
      * @param shard the transaction's shard actor
+     * @param schema
      */
-    void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard) {
+    void handleForwardedReadyTransaction(ForwardedReadyTransaction ready, ActorRef sender, Shard shard,
+            SchemaContext schema) {
         log.debug("{}: Readying transaction {}, client version {}", name,
                 ready.getTransactionID(), ready.getTxnClientVersion());
 
-        ShardDataTreeCohort cohort = ready.getTransaction().ready();
-        CohortEntry cohortEntry = new CohortEntry(ready.getTransactionID(), cohort, ready.getTxnClientVersion());
-        cohortCache.put(ready.getTransactionID(), cohortEntry);
+        final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+        final CohortEntry cohortEntry = CohortEntry.createReady(ready.getTransactionID(), cohort, cohortRegistry,
+            schema, ready.getTxnClientVersion());
+        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
             return;
@@ -165,15 +171,14 @@ class ShardCommitCoordinator {
      *
      * @param batched the BatchedModifications message to process
      * @param sender the sender of the message
-     * @param shard the transaction's shard actor
      */
     void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard) {
         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
         if(cohortEntry == null) {
-            cohortEntry = new CohortEntry(batched.getTransactionID(),
-                    dataTree.newReadWriteTransaction(batched.getTransactionID(),
-                        batched.getTransactionChainID()), batched.getVersion());
-            cohortCache.put(batched.getTransactionID(), cohortEntry);
+            cohortEntry = CohortEntry.createOpen(batched.getTransactionID(),
+                    dataTree.newReadWriteTransaction(batched.getTransactionID()),
+                    cohortRegistry, dataTree.getSchemaContext(), batched.getVersion());
+            cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         }
 
         if(log.isDebugEnabled()) {
@@ -221,7 +226,8 @@ class ShardCommitCoordinator {
 
     /**
      * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
-     * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
+     * been prepared beforehand by the sender and we just need to drive them through into the
+     * dataTree.
      *
      * @param message the ReadyLocalTransaction message to process
      * @param sender the sender of the message
@@ -230,9 +236,9 @@ class ShardCommitCoordinator {
     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
         final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
                 message.getTransactionID());
-        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort,
-                DataStoreVersions.CURRENT_VERSION);
-        cohortCache.put(message.getTransactionID(), cohortEntry);
+        final CohortEntry cohortEntry = CohortEntry.createReady(message.getTransactionID(), cohort, cohortRegistry,
+            dataTree.getSchemaContext(), DataStoreVersions.CURRENT_VERSION);
+        cohortCache.put(cohortEntry.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
 
         if(!queueCohortEntry(cohortEntry, sender, shard)) {
@@ -265,8 +271,7 @@ class ShardCommitCoordinator {
             protected BatchedModifications getModifications() {
                 if(newModifications.isEmpty() ||
                         newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
-                    newModifications.add(new BatchedModifications(from.getTransactionID(),
-                            from.getVersion(), from.getTransactionChainID()));
+                    newModifications.add(new BatchedModifications(from.getTransactionID(), from.getVersion()));
                 }
 
                 return newModifications.getLast();
@@ -281,8 +286,6 @@ class ShardCommitCoordinator {
     }
 
     private void handleCanCommit(CohortEntry cohortEntry) {
-        String transactionID = cohortEntry.getTransactionID();
-
         cohortEntry.updateLastAccessTime();
 
         if(currentCohortEntry != null) {
@@ -291,7 +294,7 @@ class ShardCommitCoordinator {
 
             if(log.isDebugEnabled()) {
                 log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
-                        name, currentCohortEntry.getTransactionID(), transactionID);
+                        name, currentCohortEntry.getTransactionID(), cohortEntry.getTransactionID());
             }
 
             return;
@@ -305,8 +308,9 @@ class ShardCommitCoordinator {
             doCanCommit(currentCohortEntry);
         } else {
             if(log.isDebugEnabled()) {
-                log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
-                        name, queuedCohortEntries.peek().getTransactionID(), transactionID);
+                log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now", name,
+                        queuedCohortEntries.peek() != null ? queuedCohortEntries.peek().getTransactionID() : "???",
+                                cohortEntry.getTransactionID());
             }
         }
     }
@@ -318,7 +322,7 @@ class ShardCommitCoordinator {
      * @param sender the actor to which to send the response
      * @param shard the transaction's shard actor
      */
-    void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
+    void handleCanCommit(Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Lookup the cohort entry that was cached previously (or should have been) by
         // transactionReady (via the ForwardedReadyTransaction message).
         final CohortEntry cohortEntry = cohortCache.get(transactionID);
@@ -353,7 +357,6 @@ class ShardCommitCoordinator {
                                 "Can Commit failed, no detailed cause available.")), cohortEntry.getShard().self());
                 }
             } else {
-                // FIXME - use caller's version
                 cohortEntry.getReplySender().tell(
                         canCommit ? CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable() :
                             CanCommitTransactionReply.no(cohortEntry.getClientVersion()).toSerializable(),
@@ -413,7 +416,7 @@ class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      * @return true if the transaction was successfully prepared, false otherwise.
      */
-    boolean handleCommit(final String transactionID, final ActorRef sender, final Shard shard) {
+    boolean handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
         // this transaction.
         final CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
@@ -432,7 +435,7 @@ class ShardCommitCoordinator {
         return doCommit(cohortEntry);
     }
 
-    void handleAbort(final String transactionID, final ActorRef sender, final Shard shard) {
+    void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
         CohortEntry cohortEntry = getCohortEntryIfCurrent(transactionID);
         if(cohortEntry != null) {
             // We don't remove the cached cohort entry here (ie pass false) in case the Tx was
@@ -486,21 +489,81 @@ class ShardCommitCoordinator {
             return;
         }
 
+        List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
+
+        log.debug("{}: Aborting {} pending queued transactions", name, cohortEntries.size());
+
+        for(CohortEntry cohortEntry: cohortEntries) {
+            if(cohortEntry.getReplySender() != null) {
+                cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+            }
+        }
+    }
+
+    private List<CohortEntry> getAndClearPendingCohortEntries() {
         List<CohortEntry> cohortEntries = new ArrayList<>();
 
         if(currentCohortEntry != null) {
             cohortEntries.add(currentCohortEntry);
+            cohortCache.remove(currentCohortEntry.getTransactionID());
             currentCohortEntry = null;
         }
 
-        cohortEntries.addAll(queuedCohortEntries);
+        for(CohortEntry cohortEntry: queuedCohortEntries) {
+            cohortEntries.add(cohortEntry);
+            cohortCache.remove(cohortEntry.getTransactionID());
+        }
+
         queuedCohortEntries.clear();
+        return cohortEntries;
+    }
 
+    Collection<Object> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
+        if(currentCohortEntry == null && queuedCohortEntries.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        Collection<Object> messages = new ArrayList<>();
+        List<CohortEntry> cohortEntries = getAndClearPendingCohortEntries();
         for(CohortEntry cohortEntry: cohortEntries) {
-            if(cohortEntry.getReplySender() != null) {
-                cohortEntry.getReplySender().tell(new Failure(new RuntimeException(reason)), shard.self());
+            if(cohortEntry.isExpired(cacheExpiryTimeoutInMillis) || cohortEntry.isAborted()) {
+                continue;
+            }
+
+            final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
+            cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
+                @Override
+                protected BatchedModifications getModifications() {
+                    if(newModifications.isEmpty() ||
+                            newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
+                        newModifications.add(new BatchedModifications(cohortEntry.getTransactionID(),
+                                cohortEntry.getClientVersion()));
+        }
+
+                    return newModifications.getLast();
+                }
+            });
+
+            if(!newModifications.isEmpty()) {
+                BatchedModifications last = newModifications.getLast();
+                last.setDoCommitOnReady(cohortEntry.isDoImmediateCommit());
+                last.setReady(true);
+                last.setTotalMessagesSent(newModifications.size());
+                messages.addAll(newModifications);
+
+                if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.CAN_COMMITTED) {
+                    messages.add(new CanCommitTransaction(cohortEntry.getTransactionID(),
+                            cohortEntry.getClientVersion()));
+                }
+
+                if(!cohortEntry.isDoImmediateCommit() && cohortEntry.getState() == CohortEntry.State.PRE_COMMITTED) {
+                    messages.add(new CommitTransaction(cohortEntry.getTransactionID(),
+                            cohortEntry.getClientVersion()));
+                }
             }
         }
+
+        return messages;
     }
 
     /**
@@ -511,7 +574,7 @@ class ShardCommitCoordinator {
      * @return the current CohortEntry or null if the given transaction ID does not match the
      *         current entry.
      */
-    CohortEntry getCohortEntryIfCurrent(String transactionID) {
+    CohortEntry getCohortEntryIfCurrent(Identifier transactionID) {
         if(isCurrentTransaction(transactionID)) {
             return currentCohortEntry;
         }
@@ -523,11 +586,11 @@ class ShardCommitCoordinator {
         return currentCohortEntry;
     }
 
-    CohortEntry getAndRemoveCohortEntry(String transactionID) {
+    CohortEntry getAndRemoveCohortEntry(Identifier transactionID) {
         return cohortCache.remove(transactionID);
     }
 
-    boolean isCurrentTransaction(String transactionID) {
+    boolean isCurrentTransaction(Identifier transactionID) {
         return currentCohortEntry != null &&
                 currentCohortEntry.getTransactionID().equals(transactionID);
     }
@@ -541,7 +604,7 @@ class ShardCommitCoordinator {
      * @param removeCohortEntry if true the CohortEntry for the transaction is also removed from
      *        the cache.
      */
-    void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
+    void currentTransactionComplete(Identifier transactionID, boolean removeCohortEntry) {
         if(removeCohortEntry) {
             cohortCache.remove(transactionID);
         }
@@ -558,9 +621,9 @@ class ShardCommitCoordinator {
     private void maybeProcessNextCohortEntry() {
         // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
         // clean out expired entries.
-        Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
+        final Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
         while(iter.hasNext()) {
-            CohortEntry next = iter.next();
+            final CohortEntry next = iter.next();
             if(next.isReadyToCommit()) {
                 if(currentCohortEntry == null) {
                     if(log.isDebugEnabled()) {
@@ -611,153 +674,7 @@ class ShardCommitCoordinator {
         this.cohortDecorator = cohortDecorator;
     }
 
-    static class CohortEntry {
-        private final String transactionID;
-        private ShardDataTreeCohort cohort;
-        private final ReadWriteShardDataTreeTransaction transaction;
-        private RuntimeException lastBatchedModificationsException;
-        private ActorRef replySender;
-        private Shard shard;
-        private boolean doImmediateCommit;
-        private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
-        private int totalBatchedModificationsReceived;
-        private boolean aborted;
-        private final short clientVersion;
-
-        CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction, short clientVersion) {
-            this.transaction = Preconditions.checkNotNull(transaction);
-            this.transactionID = transactionID;
-            this.clientVersion = clientVersion;
-        }
-
-        CohortEntry(String transactionID, ShardDataTreeCohort cohort, short clientVersion) {
-            this.transactionID = transactionID;
-            this.cohort = cohort;
-            this.transaction = null;
-            this.clientVersion = clientVersion;
-        }
-
-        void updateLastAccessTime() {
-            lastAccessTimer.reset();
-            lastAccessTimer.start();
-        }
-
-        String getTransactionID() {
-            return transactionID;
-        }
-
-        short getClientVersion() {
-            return clientVersion;
-        }
-
-        DataTreeCandidate getCandidate() {
-            return cohort.getCandidate();
-        }
-
-        ReadWriteShardDataTreeTransaction getTransaction() {
-            return transaction;
-        }
-
-        int getTotalBatchedModificationsReceived() {
-            return totalBatchedModificationsReceived;
-        }
-
-        RuntimeException getLastBatchedModificationsException() {
-            return lastBatchedModificationsException;
-        }
-
-        void applyModifications(Iterable<Modification> modifications) {
-            totalBatchedModificationsReceived++;
-            if(lastBatchedModificationsException == null) {
-                for (Modification modification : modifications) {
-                        try {
-                            modification.apply(transaction.getSnapshot());
-                        } catch (RuntimeException e) {
-                            lastBatchedModificationsException = e;
-                            throw e;
-                        }
-                }
-            }
-        }
-
-        boolean canCommit() throws InterruptedException, ExecutionException {
-            // We block on the future here (and also preCommit(), commit(), abort()) so we don't have to worry
-            // about possibly accessing our state on a different thread outside of our dispatcher.
-            // TODO: the ShardDataTreeCohort returns immediate Futures anyway which begs the question - why
-            // bother even returning Futures from ShardDataTreeCohort if we have to treat them synchronously
-            // anyway?. The Futures are really a remnant from when we were using the InMemoryDataBroker.
-            return cohort.canCommit().get();
-        }
-
-        void preCommit() throws InterruptedException, ExecutionException {
-            cohort.preCommit().get();
-        }
-
-        void commit() throws InterruptedException, ExecutionException {
-            cohort.commit().get();
-        }
-
-        void abort() throws InterruptedException, ExecutionException {
-            aborted = true;
-            cohort.abort().get();
-        }
-
-        void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {
-            Preconditions.checkState(cohort == null, "cohort was already set");
-
-            setDoImmediateCommit(doImmediateCommit);
-
-            cohort = transaction.ready();
-
-            if(cohortDecorator != null) {
-                // Call the hook for unit tests.
-                cohort = cohortDecorator.decorate(transactionID, cohort);
-            }
-        }
-
-        boolean isReadyToCommit() {
-            return replySender != null;
-        }
-
-        boolean isExpired(long expireTimeInMillis) {
-            return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
-        }
-
-        boolean isDoImmediateCommit() {
-            return doImmediateCommit;
-        }
-
-        void setDoImmediateCommit(boolean doImmediateCommit) {
-            this.doImmediateCommit = doImmediateCommit;
-        }
-
-        ActorRef getReplySender() {
-            return replySender;
-        }
-
-        void setReplySender(ActorRef replySender) {
-            this.replySender = replySender;
-        }
-
-        Shard getShard() {
-            return shard;
-        }
-
-        void setShard(Shard shard) {
-            this.shard = shard;
-        }
-
-
-        boolean isAborted() {
-            return aborted;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder builder = new StringBuilder();
-            builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
-                    .append(doImmediateCommit).append("]");
-            return builder.toString();
-        }
+   void processCohortRegistryCommand(ActorRef sender, CohortRegistryCommand message) {
+        cohortRegistry.process(sender, message);
     }
 }