Bug 3206: CDS - issues with direct commit 90/20890/2
authorTom Pantelis <tpanteli@brocade.com>
Wed, 13 May 2015 11:44:01 +0000 (07:44 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 21 May 2015 13:59:14 +0000 (13:59 +0000)
Modified RaftActor#persistData to send the ApplyState message instead of
calling applyState directly in case the Shard tries to persist another
entry during the persist callback.

Modified Shard to use the correct sneder to send the
CommitTransactionReply message.

The third fix is to ensure direct commits in a chain aren't committed
before previous coordinated commits. In the ShardCommitCoordinator, I
changed it to enqueue all CohortEntries when they are readied so they
are processed in that order.

I also included the unit test scenario that caused the issues to occur.

Change-Id: I65ffcbbac37d6be28c4e1c2dc17d3b0ca21847dc
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit cf088cc87d5e9c0dfd3fb8e47e0d6d7c5ddc19fd)

17 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/odl-datastore-test.yang

index 5dc1b9dcdf8877132b4978f1cde7f47da32e3bc7..f8bbf638a07f37c29c3aef64a4d51a963fe3b7f0 100644 (file)
@@ -342,7 +342,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     raftContext.setLastApplied(replicatedLogEntry.getIndex());
 
                     // Apply the state immediately
-                    applyState(clientActor, identifier, data);
+                    self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
 
                     // Send a ApplyJournalEntries message so that we write the fact that we applied
                     // the state to durable storage
index c32839c490eb0b53d69192b2c58f4a141fe422f0..8c32eab61df5ae32b77d5e690c3fb84e59352dd7 100644 (file)
@@ -81,6 +81,8 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
             new Procedure<ReplicatedLogEntry>() {
                 @Override
                 public void apply(ReplicatedLogEntry evt) throws Exception {
+                    context.getLogger().debug("{}: persist complete {}", context.getId(), replicatedLogEntry);
+
                     int logEntrySize = replicatedLogEntry.size();
 
                     long dataSizeForCheck = dataSize();
index 8ae79ceb2dae0c819d8e964f5ab6af8133a25e2f..2f323aafd35694b0d9c496d9eed58742a10b1aaa 100644 (file)
@@ -36,7 +36,7 @@ public class DatastoreContext {
     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
-    public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 20000;
+    public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
     public static final boolean DEFAULT_PERSISTENT = true;
@@ -45,7 +45,8 @@ public class DatastoreContext {
     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
-    public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT= 100;
+    public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 100;
+    public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
 
     private static Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
 
@@ -64,6 +65,7 @@ public class DatastoreContext {
     private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
     private boolean writeOnlyTransactionOptimizationsEnabled = true;
+    private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
 
     public static Set<String> getGlobalDatastoreTypes() {
         return globalDatastoreTypes;
@@ -93,6 +95,7 @@ public class DatastoreContext {
         this.dataStoreType = other.dataStoreType;
         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
+        this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
 
         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
@@ -201,6 +204,10 @@ public class DatastoreContext {
         return writeOnlyTransactionOptimizationsEnabled;
     }
 
+    public long getShardCommitQueueExpiryTimeoutInMillis() {
+        return shardCommitQueueExpiryTimeoutInMillis;
+    }
+
     public static class Builder {
         private final DatastoreContext datastoreContext;
         private int maxShardDataChangeExecutorPoolSize =
@@ -346,6 +353,17 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder shardCommitQueueExpiryTimeoutInMillis(long value) {
+            datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
+            return this;
+        }
+
+        public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) {
+            datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
+                    value, TimeUnit.SECONDS);
+            return this;
+        }
+
         public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
             this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
             return this;
index 66562975cc8472af0ebda42d7b7e252b68861bb9..71d9dabf37afd6c5390d6231a4151aba21722dea 100644 (file)
@@ -198,7 +198,10 @@ final class RemoteTransactionContextSupport {
                 isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
         }
 
-        TransactionContextCleanup.track(this, ret);
+        if(parent.getType() == TransactionType.READ_ONLY) {
+            TransactionContextCleanup.track(this, ret);
+        }
+
         return ret;
     }
 }
index c2cc1ac05b5dd0bb51a363b66d27d5398042f401..6da9fe947a0651e5d96981b87a274bde814f902a 100644 (file)
@@ -140,7 +140,7 @@ public class Shard extends RaftActor {
         }
 
         commitCoordinator = new ShardCommitCoordinator(store,
-                TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+                datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
 
         setTransactionCommitTimeout();
@@ -160,7 +160,7 @@ public class Shard extends RaftActor {
 
     private void setTransactionCommitTimeout() {
         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
-                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
     }
 
     public static Props props(final ShardIdentifier name,
@@ -302,14 +302,15 @@ public class Shard extends RaftActor {
     private void handleTransactionCommitTimeoutCheck() {
         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
         if(cohortEntry != null) {
-            long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
-            if(elapsed > transactionCommitTimeout) {
+            if(cohortEntry.isExpired(transactionCommitTimeout)) {
                 LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
                         persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
 
                 doAbortTransaction(cohortEntry.getTransactionID(), null);
             }
         }
+
+        commitCoordinator.cleanupExpiredCohortEntries();
     }
 
     private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
@@ -323,9 +324,9 @@ public class Shard extends RaftActor {
         // or if cohortEntry has no modifications
         // we can apply modification to the state immediately
         if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
-            applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
+            applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
         } else {
-            Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
+            Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
                 DataTreeCandidatePayload.create(candidate));
         }
     }
index 97816a55ccb5920e289eed12d665de8ca6a2ce35..f3e1e33e347f7760820bce49897bf2aa3c20a5b6 100644 (file)
@@ -12,12 +12,11 @@ import akka.actor.Status;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.base.Stopwatch;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -45,13 +44,15 @@ public class ShardCommitCoordinator {
         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
     }
 
-    private final Cache<String, CohortEntry> cohortCache;
+    private final Map<String, CohortEntry> cohortCache = new HashMap<>();
 
     private CohortEntry currentCohortEntry;
 
     private final ShardDataTree dataTree;
 
-    private final Queue<CohortEntry> queuedCohortEntries;
+    // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
+    // since this should only be accessed on the shard's dispatcher.
+    private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
 
     private int queueCapacity;
 
@@ -59,15 +60,7 @@ public class ShardCommitCoordinator {
 
     private final String name;
 
-    private final RemovalListener<String, CohortEntry> cacheRemovalListener =
-            new RemovalListener<String, CohortEntry>() {
-                @Override
-                public void onRemoval(RemovalNotification<String, CohortEntry> notification) {
-                    if(notification.getCause() == RemovalCause.EXPIRED) {
-                        log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey());
-                    }
-                }
-            };
+    private final long cacheExpiryTimeoutInMillis;
 
     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     private CohortDecorator cohortDecorator;
@@ -75,19 +68,13 @@ public class ShardCommitCoordinator {
     private ReadyTransactionReply readyTransactionReply;
 
     public ShardCommitCoordinator(ShardDataTree dataTree,
-            long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
+            long cacheExpiryTimeoutInMillis, int queueCapacity, ActorRef shardActor, Logger log, String name) {
 
         this.queueCapacity = queueCapacity;
         this.log = log;
         this.name = name;
         this.dataTree = Preconditions.checkNotNull(dataTree);
-
-        cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
-                removalListener(cacheRemovalListener).build();
-
-        // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
-        // since this should only be accessed on the shard's dispatcher.
-        queuedCohortEntries = new LinkedList<>();
+        this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
     }
 
     public void setQueueCapacity(int queueCapacity) {
@@ -102,6 +89,23 @@ public class ShardCommitCoordinator {
         return readyTransactionReply;
     }
 
+    private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
+        if(queuedCohortEntries.size() < queueCapacity) {
+            queuedCohortEntries.offer(cohortEntry);
+            return true;
+        } else {
+            cohortCache.remove(cohortEntry.getTransactionID());
+
+            RuntimeException ex = new RuntimeException(
+                    String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
+                                  " capacity %d has been reached.",
+                                  name, cohortEntry.getTransactionID(), queueCapacity));
+            log.error(ex.getMessage());
+            sender.tell(new Status.Failure(ex), shard.self());
+            return false;
+        }
+    }
+
     /**
      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
@@ -114,6 +118,10 @@ public class ShardCommitCoordinator {
                 (MutableCompositeModification) ready.getModification());
         cohortCache.put(ready.getTransactionID(), cohortEntry);
 
+        if(!queueCohortEntry(cohortEntry, sender, shard)) {
+            return;
+        }
+
         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
             // Return our actor path as we'll handle the three phase commit except if the Tx client
             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
@@ -156,9 +164,9 @@ public class ShardCommitCoordinator {
      *
      * @throws ExecutionException if an error occurs loading the cache
      */
-    boolean handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard)
+    void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard)
             throws ExecutionException {
-        CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
+        CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
@@ -174,6 +182,10 @@ public class ShardCommitCoordinator {
         cohortEntry.applyModifications(batched.getModifications());
 
         if(batched.isReady()) {
+            if(!queueCohortEntry(cohortEntry, sender, shard)) {
+                return;
+            }
+
             if(log.isDebugEnabled()) {
                 log.debug("{}: Readying Tx {}, client version {}", name,
                         batched.getTransactionID(), batched.getVersion());
@@ -191,8 +203,6 @@ public class ShardCommitCoordinator {
         } else {
             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
         }
-
-        return batched.isReady();
     }
 
     /**
@@ -208,6 +218,11 @@ public class ShardCommitCoordinator {
         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
         cohortCache.put(message.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
+
+        if(!queueCohortEntry(cohortEntry, sender, shard)) {
+            return;
+        }
+
         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
 
         if (message.isDoCommitOnReady()) {
@@ -222,36 +237,31 @@ public class ShardCommitCoordinator {
     private void handleCanCommit(CohortEntry cohortEntry) {
         String transactionID = cohortEntry.getTransactionID();
 
-        if(log.isDebugEnabled()) {
-            log.debug("{}: Processing canCommit for transaction {} for shard {}",
-                    name, transactionID, cohortEntry.getShard().self().path());
-        }
+        cohortEntry.updateLastAccessTime();
 
         if(currentCohortEntry != null) {
-            // There's already a Tx commit in progress - attempt to queue this entry to be
-            // committed after the current Tx completes.
-            log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
-                    name, currentCohortEntry.getTransactionID(), transactionID);
+            // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
+            // queue and will get processed after all prior entries complete.
 
-            if(queuedCohortEntries.size() < queueCapacity) {
-                queuedCohortEntries.offer(cohortEntry);
-            } else {
-                removeCohortEntry(transactionID);
-
-                RuntimeException ex = new RuntimeException(
-                        String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
-                                      " capacity %d has been reached.",
-                                      name, transactionID, queueCapacity));
-                log.error(ex.getMessage());
-                cohortEntry.getReplySender().tell(new Status.Failure(ex), cohortEntry.getShard().self());
+            if(log.isDebugEnabled()) {
+                log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
+                        name, currentCohortEntry.getTransactionID(), transactionID);
             }
-        } else {
-            // No Tx commit currently in progress - make this the current entry and proceed with
-            // canCommit.
-            cohortEntry.updateLastAccessTime();
-            currentCohortEntry = cohortEntry;
 
-            doCanCommit(cohortEntry);
+            return;
+        }
+
+        // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
+        // it the current entry and proceed with canCommit.
+        // Purposely checking reference equality here.
+        if(queuedCohortEntries.peek() == cohortEntry) {
+            currentCohortEntry = queuedCohortEntries.poll();
+            doCanCommit(currentCohortEntry);
+        } else {
+            if(log.isDebugEnabled()) {
+                log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
+                        name, queuedCohortEntries.peek().getTransactionID(), transactionID);
+            }
         }
     }
 
@@ -265,7 +275,7 @@ public class ShardCommitCoordinator {
     public void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
         // Lookup the cohort entry that was cached previously (or should have been) by
         // transactionReady (via the ForwardedReadyTransaction message).
-        final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
+        final CohortEntry cohortEntry = cohortCache.get(transactionID);
         if(cohortEntry == null) {
             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
             // between canCommit and ready and the entry was expired from the cache.
@@ -283,7 +293,6 @@ public class ShardCommitCoordinator {
     }
 
     private void doCanCommit(final CohortEntry cohortEntry) {
-
         boolean canCommit = false;
         try {
             // We block on the future here so we don't have to worry about possibly accessing our
@@ -291,6 +300,8 @@ public class ShardCommitCoordinator {
             // currently uses a same thread executor anyway.
             canCommit = cohortEntry.getCohort().canCommit().get();
 
+            log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
+
             if(cohortEntry.isDoImmediateCommit()) {
                 if(canCommit) {
                     doCommit(cohortEntry);
@@ -304,7 +315,7 @@ public class ShardCommitCoordinator {
                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
             }
         } catch (Exception e) {
-            log.debug("{}: An exception occurred during canCommit: {}", name, e);
+            log.debug("{}: An exception occurred during canCommit", name, e);
 
             Throwable failure = e;
             if(e instanceof ExecutionException) {
@@ -367,6 +378,7 @@ public class ShardCommitCoordinator {
             return false;
         }
 
+        cohortEntry.setReplySender(sender);
         return doCommit(cohortEntry);
     }
 
@@ -391,13 +403,7 @@ public class ShardCommitCoordinator {
     }
 
     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
-        CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
-        cohortCache.invalidate(transactionID);
-        return cohortEntry;
-    }
-
-    public void removeCohortEntry(String transactionID) {
-        cohortCache.invalidate(transactionID);
+        return cohortCache.remove(transactionID);
     }
 
     public boolean isCurrentTransaction(String transactionID) {
@@ -416,33 +422,66 @@ public class ShardCommitCoordinator {
      */
     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
         if(removeCohortEntry) {
-            removeCohortEntry(transactionID);
+            cohortCache.remove(transactionID);
         }
 
         if(isCurrentTransaction(transactionID)) {
-            // Dequeue the next cohort entry waiting in the queue.
-            currentCohortEntry = queuedCohortEntries.poll();
-            if(currentCohortEntry != null) {
-                currentCohortEntry.updateLastAccessTime();
-                doCanCommit(currentCohortEntry);
+            currentCohortEntry = null;
+
+            log.debug("{}: currentTransactionComplete: {}", name, transactionID);
+
+            maybeProcessNextCohortEntry();
+        }
+    }
+
+    private void maybeProcessNextCohortEntry() {
+        // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
+        // clean out expired entries.
+        Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
+        while(iter.hasNext()) {
+            CohortEntry next = iter.next();
+            if(next.isReadyToCommit()) {
+                if(currentCohortEntry == null) {
+                    if(log.isDebugEnabled()) {
+                        log.debug("{}: Next entry to canCommit {}", name, next);
+                    }
+
+                    iter.remove();
+                    currentCohortEntry = next;
+                    currentCohortEntry.updateLastAccessTime();
+                    doCanCommit(currentCohortEntry);
+                }
+
+                break;
+            } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
+                log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
+                        name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
+
+                iter.remove();
+                cohortCache.remove(next.getTransactionID());
+            } else {
+                break;
             }
         }
     }
 
+    void cleanupExpiredCohortEntries() {
+        maybeProcessNextCohortEntry();
+    }
+
     @VisibleForTesting
     void setCohortDecorator(CohortDecorator cohortDecorator) {
         this.cohortDecorator = cohortDecorator;
     }
 
-
     static class CohortEntry {
         private final String transactionID;
         private ShardDataTreeCohort cohort;
         private final ReadWriteShardDataTreeTransaction transaction;
         private ActorRef replySender;
         private Shard shard;
-        private long lastAccessTime;
         private boolean doImmediateCommit;
+        private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
 
         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
             this.transaction = Preconditions.checkNotNull(transaction);
@@ -463,11 +502,8 @@ public class ShardCommitCoordinator {
         }
 
         void updateLastAccessTime() {
-            lastAccessTime = System.currentTimeMillis();
-        }
-
-        long getLastAccessTime() {
-            return lastAccessTime;
+            lastAccessTimer.reset();
+            lastAccessTimer.start();
         }
 
         String getTransactionID() {
@@ -497,6 +533,14 @@ public class ShardCommitCoordinator {
             }
         }
 
+        boolean isReadyToCommit() {
+            return replySender != null;
+        }
+
+        boolean isExpired(long expireTimeInMillis) {
+            return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
+        }
+
         boolean isDoImmediateCommit() {
             return doImmediateCommit;
         }
@@ -520,5 +564,13 @@ public class ShardCommitCoordinator {
         void setShard(Shard shard) {
             this.shard = shard;
         }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
+                    .append(doImmediateCommit).append("]");
+            return builder.toString();
+        }
     }
 }
index 6b81792a57ca86de622df194c6c090bdd98f189b..981be366d09cb15ca18840021a4a9d8d2948b028 100644 (file)
@@ -34,6 +34,8 @@ public interface DatastoreConfigurationMXBean {
 
     int getShardTransactionCommitQueueCapacity();
 
+    long getShardCommitQueueExpiryTimeoutInSeconds();
+
     long getShardInitializationTimeoutInSeconds();
 
     long getShardLeaderElectionTimeoutInSeconds();
index 79ff2a4e54b2f0c75118dc4c9536779bb69a6b31..261b93de56e057874a44600495e977d73b643bd4 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
 
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 
@@ -73,6 +74,11 @@ public class DatastoreConfigurationMXBeanImpl extends AbstractMXBean implements
         return context.getShardTransactionCommitTimeoutInSeconds();
     }
 
+    @Override
+    public long getShardCommitQueueExpiryTimeoutInSeconds() {
+        return TimeUnit.SECONDS.convert(context.getShardCommitQueueExpiryTimeoutInMillis(), TimeUnit.MILLISECONDS);
+    }
+
     @Override
     public int getShardTransactionCommitQueueCapacity() {
         return context.getShardTransactionCommitQueueCapacity();
index 252788203f5972746d1259a03901b30e1278feed..539797e65966e9d8dcdfbef11969223f8308f267 100644 (file)
@@ -64,6 +64,8 @@ public class DistributedConfigDataStoreProviderModule extends
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
                 .transactionCreationInitialRateLimit(props.getTransactionCreationInitialRateLimit().getValue())
                 .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
+                .shardCommitQueueExpiryTimeoutInSeconds(
+                        props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
index 08845654a5fb34ff77227eb05f8eb08ef4e51caf..45ed184de3875feb96486bb71b2bd952df139fd8 100644 (file)
@@ -65,6 +65,8 @@ public class DistributedOperationalDataStoreProviderModule extends
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
                 .transactionCreationInitialRateLimit(props.getTransactionCreationInitialRateLimit().getValue())
                 .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
+                .shardCommitQueueExpiryTimeoutInSeconds(
+                        props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
index dc83af9a756374694e6203c61eebc49d72e214fc..3c753d02d5a9520d7ca82db1a4a7d90217c9d222 100644 (file)
@@ -136,11 +136,21 @@ module distributed-datastore-provider {
          }
 
          leaf shard-transaction-commit-queue-capacity {
-            default 20000;
+            default 50000;
             type non-zero-uint32-type;
             description "The maximum allowed capacity for each shard's transaction commit queue.";
          }
 
+         leaf shard-commit-queue-expiry-timeout-in-seconds {
+             default 120; // 2 minutes
+             type non-zero-uint32-type;
+             description "The maximum amount of time a transaction can remain in a shard's commit queue waiting 
+                 to begin the CanCommit phase as coordinated by the broker front-end. Normally this should be
+                 quick but latencies can occur in between transaction ready and CanCommit or a remote broker
+                 could lose connection and CanCommit might never occur. Expiring transactions from the queue
+                 allows subsequent pending transaction to be processed.";
+         }
+         
          leaf shard-initialization-timeout-in-seconds {
             default 300; // 5 minutes
             type non-zero-uint32-type;
index f3d93b896de082534f1ed4e8879117226b3952a6..76ae3c71566bdce5663918e05a7f1d6cf54e352b 100644 (file)
@@ -9,6 +9,7 @@ import akka.cluster.Cluster;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
@@ -788,12 +789,15 @@ public class DistributedDataStoreIntegrationTest {
 
             writeTx = txChain.newWriteOnlyTransaction();
 
-            //writeTx.delete(personPath);
+            writeTx.delete(carPath);
 
             DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
 
-            doCommit(cohort1);
-            doCommit(cohort2);
+            ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
+            ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
+
+            doCommit(canCommit1, cohort1);
+            doCommit(canCommit2, cohort2);
             doCommit(cohort3);
 
             txChain.close();
@@ -801,12 +805,11 @@ public class DistributedDataStoreIntegrationTest {
             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
-            assertEquals("isPresent", true, optional.isPresent());
-            assertEquals("Data node", car, optional.get());
+            assertEquals("isPresent", false, optional.isPresent());
 
             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
-            //assertEquals("isPresent", false, optional.isPresent());
             assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", person, optional.get());
 
             cleanup(dataStore);
         }};
index 7cef2fd743ae9c664d9ca37aa5abc3ffaa816702..be8ede9d6f4a893dc3c2851bc83557dfb4867f44 100644 (file)
@@ -33,6 +33,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -253,8 +254,8 @@ public class DistributedDataStoreRemotingIntegrationTest {
     }
 
     @Test
-    public void testTransactionChain() throws Exception {
-        initDatastores("testTransactionChain");
+    public void testTransactionChainWithSingleShard() throws Exception {
+        initDatastores("testTransactionChainWithSingleShard");
 
         DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
 
@@ -299,6 +300,61 @@ public class DistributedDataStoreRemotingIntegrationTest {
         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
     }
 
+    @Test
+    public void testTransactionChainWithMultipleShards() throws Exception{
+        initDatastores("testTransactionChainWithMultipleShards");
+
+        DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        assertNotNull("newWriteOnlyTransaction returned null", writeTx);
+
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
+
+        followerTestKit.doCommit(writeTx.ready());
+
+        DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
+
+        MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
+        readWriteTx.write(carPath, car);
+
+        MapEntryNode person = PeopleModel.newPersonEntry("jack");
+        YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
+        readWriteTx.merge(personPath, person);
+
+        Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
+        assertEquals("isPresent", true, optional.isPresent());
+        assertEquals("Data node", car, optional.get());
+
+        optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
+        assertEquals("isPresent", true, optional.isPresent());
+        assertEquals("Data node", person, optional.get());
+
+        DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
+
+        writeTx = txChain.newWriteOnlyTransaction();
+
+        writeTx.delete(personPath);
+
+        DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
+
+        followerTestKit.doCommit(cohort2);
+        followerTestKit.doCommit(cohort3);
+
+        txChain.close();
+
+        DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+        verifyCars(readTx, car);
+
+        optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
+        assertEquals("isPresent", false, optional.isPresent());
+    }
+
     @Test
     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
         initDatastores("testReadyLocalTransactionForwardedToLeader");
index 109e77c52361e34581d4c6244d76fd62063fabc8..94f20856ff43754dae02986b554259c9e637c212 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -122,6 +123,13 @@ class IntegrationTestKit extends ShardTestKit {
         cohort.commit().get(5, TimeUnit.SECONDS);
     }
 
+    void doCommit(final ListenableFuture<Boolean> canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
+        Boolean canCommit = canCommitFuture.get(7, TimeUnit.SECONDS);
+        assertEquals("canCommit", true, canCommit);
+        cohort.preCommit().get(5, TimeUnit.SECONDS);
+        cohort.commit().get(5, TimeUnit.SECONDS);
+    }
+
     void cleanup(DistributedDataStore dataStore) {
         if(dataStore != null) {
             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
index ee543164de51e41e711b424da8a29a271c1d5382..49cfcbc9c257a31a5e7ad51585ebda185d8f9fd5 100644 (file)
@@ -1826,7 +1826,7 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
-        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
+        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
@@ -1866,9 +1866,11 @@ public class ShardTest extends AbstractShardTest {
                     cohort2, modification2, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
+            // The 3rd Tx should exceed queue capacity and fail.
+
             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
                     cohort3, modification3, true, false), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.class);
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // canCommit 1st Tx.
 
@@ -1888,6 +1890,125 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test
+    public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
+        dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
+
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testTransactionCommitWithPriorExpiredCohortEntries");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
+            // should expire from the queue and the last one should be processed.
+
+            shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
+        dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
+
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testTransactionCommitWithSubsequentExpiredCohortEntry");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            // CanCommit the first one so it's the current in-progress CohortEntry.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Ready the second Tx.
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            // Ready the third Tx.
+
+            String transactionID3 = "tx3";
+            DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
+            new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
+                    .apply(modification3);
+            ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
+
+            shard.tell(readyMessage, getRef());
+
+            // Commit the first Tx. After completing, the second should expire from the queue and the third
+            // Tx committed.
+
+            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Expect commit reply from the third Tx.
+
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
+            assertNotNull(TestModel.TEST2_PATH + " not found", node);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
     @Test
     public void testCanCommitBeforeReadyFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{
index 60420dcf236ac8d19a51bb9111bf7c9d3b6880f7..07e9b299420240fdccd12d231f256da7f4c588b2 100644 (file)
@@ -22,6 +22,9 @@ public class TestModel {
     public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
             "test");
 
+    public static final QName TEST2_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
+            "test2");
+
     public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
             "junk");
 
@@ -36,6 +39,7 @@ public class TestModel {
     private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
 
     public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
+    public static final YangInstanceIdentifier TEST2_PATH = YangInstanceIdentifier.of(TEST2_QNAME);
     public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
     public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
             node(OUTER_LIST_QNAME).build();
index f6d0202fd5f7dbc447f6d5bfbeebf29ce60a3cc6..e36f38ea964a4d56850dd9d9f5a70a38256d0fba 100644 (file)
@@ -39,4 +39,7 @@ module odl-datastore-test {
             }
         }
     }
+    
+    container test2 {
+    }
 }
\ No newline at end of file