Bug 3206: CDS - issues with direct commit 90/20890/2
authorTom Pantelis <tpanteli@brocade.com>
Wed, 13 May 2015 11:44:01 +0000 (07:44 -0400)
committerGerrit Code Review <gerrit@opendaylight.org>
Thu, 21 May 2015 13:59:14 +0000 (13:59 +0000)
Modified RaftActor#persistData to send the ApplyState message instead of
calling applyState directly in case the Shard tries to persist another
entry during the persist callback.

Modified Shard to use the correct sneder to send the
CommitTransactionReply message.

The third fix is to ensure direct commits in a chain aren't committed
before previous coordinated commits. In the ShardCommitCoordinator, I
changed it to enqueue all CohortEntries when they are readied so they
are processed in that order.

I also included the unit test scenario that caused the issues to occur.

Change-Id: I65ffcbbac37d6be28c4e1c2dc17d3b0ca21847dc
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
(cherry picked from commit cf088cc87d5e9c0dfd3fb8e47e0d6d7c5ddc19fd)

17 files changed:
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/RaftActor.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContextSupport.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/jmx/mbeans/DatastoreConfigurationMXBeanImpl.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/odl-datastore-test.yang

index 5dc1b9d..f8bbf63 100644 (file)
@@ -342,7 +342,7 @@ public abstract class RaftActor extends AbstractUntypedPersistentActor {
                     raftContext.setLastApplied(replicatedLogEntry.getIndex());
 
                     // Apply the state immediately
-                    applyState(clientActor, identifier, data);
+                    self().tell(new ApplyState(clientActor, identifier, replicatedLogEntry), self());
 
                     // Send a ApplyJournalEntries message so that we write the fact that we applied
                     // the state to durable storage
index c32839c..8c32eab 100644 (file)
@@ -81,6 +81,8 @@ class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
             new Procedure<ReplicatedLogEntry>() {
                 @Override
                 public void apply(ReplicatedLogEntry evt) throws Exception {
+                    context.getLogger().debug("{}: persist complete {}", context.getId(), replicatedLogEntry);
+
                     int logEntrySize = replicatedLogEntry.size();
 
                     long dataSizeForCheck = dataSize();
index 8ae79ce..2f323aa 100644 (file)
@@ -36,7 +36,7 @@ public class DatastoreContext {
     public static final int DEFAULT_SNAPSHOT_BATCH_COUNT = 20000;
     public static final int DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS = 500;
     public static final int DEFAULT_ISOLATED_LEADER_CHECK_INTERVAL_IN_MILLIS = DEFAULT_HEARTBEAT_INTERVAL_IN_MILLIS * 10;
-    public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 20000;
+    public static final int DEFAULT_SHARD_TX_COMMIT_QUEUE_CAPACITY = 50000;
     public static final Timeout DEFAULT_SHARD_INITIALIZATION_TIMEOUT = new Timeout(5, TimeUnit.MINUTES);
     public static final Timeout DEFAULT_SHARD_LEADER_ELECTION_TIMEOUT = new Timeout(30, TimeUnit.SECONDS);
     public static final boolean DEFAULT_PERSISTENT = true;
@@ -45,7 +45,8 @@ public class DatastoreContext {
     public static final int DEFAULT_SHARD_ELECTION_TIMEOUT_FACTOR = 2;
     public static final int DEFAULT_TX_CREATION_INITIAL_RATE_LIMIT = 100;
     public static final String UNKNOWN_DATA_STORE_TYPE = "unknown";
-    public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT= 100;
+    public static final int DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT = 100;
+    public static final long DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
 
     private static Set<String> globalDatastoreTypes = Sets.newConcurrentHashSet();
 
@@ -64,6 +65,7 @@ public class DatastoreContext {
     private String dataStoreType = UNKNOWN_DATA_STORE_TYPE;
     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
     private boolean writeOnlyTransactionOptimizationsEnabled = true;
+    private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
 
     public static Set<String> getGlobalDatastoreTypes() {
         return globalDatastoreTypes;
@@ -93,6 +95,7 @@ public class DatastoreContext {
         this.dataStoreType = other.dataStoreType;
         this.shardBatchedModificationCount = other.shardBatchedModificationCount;
         this.writeOnlyTransactionOptimizationsEnabled = other.writeOnlyTransactionOptimizationsEnabled;
+        this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
 
         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
@@ -201,6 +204,10 @@ public class DatastoreContext {
         return writeOnlyTransactionOptimizationsEnabled;
     }
 
+    public long getShardCommitQueueExpiryTimeoutInMillis() {
+        return shardCommitQueueExpiryTimeoutInMillis;
+    }
+
     public static class Builder {
         private final DatastoreContext datastoreContext;
         private int maxShardDataChangeExecutorPoolSize =
@@ -346,6 +353,17 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder shardCommitQueueExpiryTimeoutInMillis(long value) {
+            datastoreContext.shardCommitQueueExpiryTimeoutInMillis = value;
+            return this;
+        }
+
+        public Builder shardCommitQueueExpiryTimeoutInSeconds(long value) {
+            datastoreContext.shardCommitQueueExpiryTimeoutInMillis = TimeUnit.MILLISECONDS.convert(
+                    value, TimeUnit.SECONDS);
+            return this;
+        }
+
         public Builder maxShardDataChangeExecutorPoolSize(int maxShardDataChangeExecutorPoolSize) {
             this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
             return this;
index 6656297..71d9dab 100644 (file)
@@ -198,7 +198,10 @@ final class RemoteTransactionContextSupport {
                 isTxActorLocal, remoteTransactionVersion, parent.getCompleter());
         }
 
-        TransactionContextCleanup.track(this, ret);
+        if(parent.getType() == TransactionType.READ_ONLY) {
+            TransactionContextCleanup.track(this, ret);
+        }
+
         return ret;
     }
 }
index c2cc1ac..6da9fe9 100644 (file)
@@ -140,7 +140,7 @@ public class Shard extends RaftActor {
         }
 
         commitCoordinator = new ShardCommitCoordinator(store,
-                TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+                datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
 
         setTransactionCommitTimeout();
@@ -160,7 +160,7 @@ public class Shard extends RaftActor {
 
     private void setTransactionCommitTimeout() {
         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
-                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
     }
 
     public static Props props(final ShardIdentifier name,
@@ -302,14 +302,15 @@ public class Shard extends RaftActor {
     private void handleTransactionCommitTimeoutCheck() {
         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
         if(cohortEntry != null) {
-            long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
-            if(elapsed > transactionCommitTimeout) {
+            if(cohortEntry.isExpired(transactionCommitTimeout)) {
                 LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
                         persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
 
                 doAbortTransaction(cohortEntry.getTransactionID(), null);
             }
         }
+
+        commitCoordinator.cleanupExpiredCohortEntries();
     }
 
     private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
@@ -323,9 +324,9 @@ public class Shard extends RaftActor {
         // or if cohortEntry has no modifications
         // we can apply modification to the state immediately
         if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
-            applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
+            applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
         } else {
-            Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
+            Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
                 DataTreeCandidatePayload.create(candidate));
         }
     }
index 97816a5..f3e1e33 100644 (file)
@@ -12,12 +12,11 @@ import akka.actor.Status;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.base.Stopwatch;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -45,13 +44,15 @@ public class ShardCommitCoordinator {
         ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual);
     }
 
-    private final Cache<String, CohortEntry> cohortCache;
+    private final Map<String, CohortEntry> cohortCache = new HashMap<>();
 
     private CohortEntry currentCohortEntry;
 
     private final ShardDataTree dataTree;
 
-    private final Queue<CohortEntry> queuedCohortEntries;
+    // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
+    // since this should only be accessed on the shard's dispatcher.
+    private final Queue<CohortEntry> queuedCohortEntries = new LinkedList<>();
 
     private int queueCapacity;
 
@@ -59,15 +60,7 @@ public class ShardCommitCoordinator {
 
     private final String name;
 
-    private final RemovalListener<String, CohortEntry> cacheRemovalListener =
-            new RemovalListener<String, CohortEntry>() {
-                @Override
-                public void onRemoval(RemovalNotification<String, CohortEntry> notification) {
-                    if(notification.getCause() == RemovalCause.EXPIRED) {
-                        log.warn("{}: Transaction {} was timed out of the cache", name, notification.getKey());
-                    }
-                }
-            };
+    private final long cacheExpiryTimeoutInMillis;
 
     // This is a hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts.
     private CohortDecorator cohortDecorator;
@@ -75,19 +68,13 @@ public class ShardCommitCoordinator {
     private ReadyTransactionReply readyTransactionReply;
 
     public ShardCommitCoordinator(ShardDataTree dataTree,
-            long cacheExpiryTimeoutInSec, int queueCapacity, ActorRef shardActor, Logger log, String name) {
+            long cacheExpiryTimeoutInMillis, int queueCapacity, ActorRef shardActor, Logger log, String name) {
 
         this.queueCapacity = queueCapacity;
         this.log = log;
         this.name = name;
         this.dataTree = Preconditions.checkNotNull(dataTree);
-
-        cohortCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpiryTimeoutInSec, TimeUnit.SECONDS).
-                removalListener(cacheRemovalListener).build();
-
-        // We use a LinkedList here to avoid synchronization overhead with concurrent queue impls
-        // since this should only be accessed on the shard's dispatcher.
-        queuedCohortEntries = new LinkedList<>();
+        this.cacheExpiryTimeoutInMillis = cacheExpiryTimeoutInMillis;
     }
 
     public void setQueueCapacity(int queueCapacity) {
@@ -102,6 +89,23 @@ public class ShardCommitCoordinator {
         return readyTransactionReply;
     }
 
+    private boolean queueCohortEntry(CohortEntry cohortEntry, ActorRef sender, Shard shard) {
+        if(queuedCohortEntries.size() < queueCapacity) {
+            queuedCohortEntries.offer(cohortEntry);
+            return true;
+        } else {
+            cohortCache.remove(cohortEntry.getTransactionID());
+
+            RuntimeException ex = new RuntimeException(
+                    String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
+                                  " capacity %d has been reached.",
+                                  name, cohortEntry.getTransactionID(), queueCapacity));
+            log.error(ex.getMessage());
+            sender.tell(new Status.Failure(ex), shard.self());
+            return false;
+        }
+    }
+
     /**
      * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
      * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
@@ -114,6 +118,10 @@ public class ShardCommitCoordinator {
                 (MutableCompositeModification) ready.getModification());
         cohortCache.put(ready.getTransactionID(), cohortEntry);
 
+        if(!queueCohortEntry(cohortEntry, sender, shard)) {
+            return;
+        }
+
         if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
             // Return our actor path as we'll handle the three phase commit except if the Tx client
             // version < Helium-1 version which means the Tx was initiated by a base Helium version node.
@@ -156,9 +164,9 @@ public class ShardCommitCoordinator {
      *
      * @throws ExecutionException if an error occurs loading the cache
      */
-    boolean handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard)
+    void handleBatchedModifications(BatchedModifications batched, ActorRef sender, Shard shard)
             throws ExecutionException {
-        CohortEntry cohortEntry = cohortCache.getIfPresent(batched.getTransactionID());
+        CohortEntry cohortEntry = cohortCache.get(batched.getTransactionID());
         if(cohortEntry == null) {
             cohortEntry = new CohortEntry(batched.getTransactionID(),
                     dataTree.newReadWriteTransaction(batched.getTransactionID(),
@@ -174,6 +182,10 @@ public class ShardCommitCoordinator {
         cohortEntry.applyModifications(batched.getModifications());
 
         if(batched.isReady()) {
+            if(!queueCohortEntry(cohortEntry, sender, shard)) {
+                return;
+            }
+
             if(log.isDebugEnabled()) {
                 log.debug("{}: Readying Tx {}, client version {}", name,
                         batched.getTransactionID(), batched.getVersion());
@@ -191,8 +203,6 @@ public class ShardCommitCoordinator {
         } else {
             sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
         }
-
-        return batched.isReady();
     }
 
     /**
@@ -208,6 +218,11 @@ public class ShardCommitCoordinator {
         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
         cohortCache.put(message.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
+
+        if(!queueCohortEntry(cohortEntry, sender, shard)) {
+            return;
+        }
+
         log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
 
         if (message.isDoCommitOnReady()) {
@@ -222,36 +237,31 @@ public class ShardCommitCoordinator {
     private void handleCanCommit(CohortEntry cohortEntry) {
         String transactionID = cohortEntry.getTransactionID();
 
-        if(log.isDebugEnabled()) {
-            log.debug("{}: Processing canCommit for transaction {} for shard {}",
-                    name, transactionID, cohortEntry.getShard().self().path());
-        }
+        cohortEntry.updateLastAccessTime();
 
         if(currentCohortEntry != null) {
-            // There's already a Tx commit in progress - attempt to queue this entry to be
-            // committed after the current Tx completes.
-            log.debug("{}: Transaction {} is already in progress - queueing transaction {}",
-                    name, currentCohortEntry.getTransactionID(), transactionID);
+            // There's already a Tx commit in progress so we can't process this entry yet - but it's in the
+            // queue and will get processed after all prior entries complete.
 
-            if(queuedCohortEntries.size() < queueCapacity) {
-                queuedCohortEntries.offer(cohortEntry);
-            } else {
-                removeCohortEntry(transactionID);
-
-                RuntimeException ex = new RuntimeException(
-                        String.format("%s: Could not enqueue transaction %s - the maximum commit queue"+
-                                      " capacity %d has been reached.",
-                                      name, transactionID, queueCapacity));
-                log.error(ex.getMessage());
-                cohortEntry.getReplySender().tell(new Status.Failure(ex), cohortEntry.getShard().self());
+            if(log.isDebugEnabled()) {
+                log.debug("{}: Commit for Tx {} already in progress - skipping canCommit for {} for now",
+                        name, currentCohortEntry.getTransactionID(), transactionID);
             }
-        } else {
-            // No Tx commit currently in progress - make this the current entry and proceed with
-            // canCommit.
-            cohortEntry.updateLastAccessTime();
-            currentCohortEntry = cohortEntry;
 
-            doCanCommit(cohortEntry);
+            return;
+        }
+
+        // No Tx commit currently in progress - check if this entry is the next one in the queue, If so make
+        // it the current entry and proceed with canCommit.
+        // Purposely checking reference equality here.
+        if(queuedCohortEntries.peek() == cohortEntry) {
+            currentCohortEntry = queuedCohortEntries.poll();
+            doCanCommit(currentCohortEntry);
+        } else {
+            if(log.isDebugEnabled()) {
+                log.debug("{}: Tx {} is the next pending canCommit - skipping {} for now",
+                        name, queuedCohortEntries.peek().getTransactionID(), transactionID);
+            }
         }
     }
 
@@ -265,7 +275,7 @@ public class ShardCommitCoordinator {
     public void handleCanCommit(String transactionID, final ActorRef sender, final Shard shard) {
         // Lookup the cohort entry that was cached previously (or should have been) by
         // transactionReady (via the ForwardedReadyTransaction message).
-        final CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
+        final CohortEntry cohortEntry = cohortCache.get(transactionID);
         if(cohortEntry == null) {
             // Either canCommit was invoked before ready(shouldn't happen)  or a long time passed
             // between canCommit and ready and the entry was expired from the cache.
@@ -283,7 +293,6 @@ public class ShardCommitCoordinator {
     }
 
     private void doCanCommit(final CohortEntry cohortEntry) {
-
         boolean canCommit = false;
         try {
             // We block on the future here so we don't have to worry about possibly accessing our
@@ -291,6 +300,8 @@ public class ShardCommitCoordinator {
             // currently uses a same thread executor anyway.
             canCommit = cohortEntry.getCohort().canCommit().get();
 
+            log.debug("{}: canCommit for {}: {}", name, cohortEntry.getTransactionID(), canCommit);
+
             if(cohortEntry.isDoImmediateCommit()) {
                 if(canCommit) {
                     doCommit(cohortEntry);
@@ -304,7 +315,7 @@ public class ShardCommitCoordinator {
                             CanCommitTransactionReply.NO.toSerializable(), cohortEntry.getShard().self());
             }
         } catch (Exception e) {
-            log.debug("{}: An exception occurred during canCommit: {}", name, e);
+            log.debug("{}: An exception occurred during canCommit", name, e);
 
             Throwable failure = e;
             if(e instanceof ExecutionException) {
@@ -367,6 +378,7 @@ public class ShardCommitCoordinator {
             return false;
         }
 
+        cohortEntry.setReplySender(sender);
         return doCommit(cohortEntry);
     }
 
@@ -391,13 +403,7 @@ public class ShardCommitCoordinator {
     }
 
     public CohortEntry getAndRemoveCohortEntry(String transactionID) {
-        CohortEntry cohortEntry = cohortCache.getIfPresent(transactionID);
-        cohortCache.invalidate(transactionID);
-        return cohortEntry;
-    }
-
-    public void removeCohortEntry(String transactionID) {
-        cohortCache.invalidate(transactionID);
+        return cohortCache.remove(transactionID);
     }
 
     public boolean isCurrentTransaction(String transactionID) {
@@ -416,33 +422,66 @@ public class ShardCommitCoordinator {
      */
     public void currentTransactionComplete(String transactionID, boolean removeCohortEntry) {
         if(removeCohortEntry) {
-            removeCohortEntry(transactionID);
+            cohortCache.remove(transactionID);
         }
 
         if(isCurrentTransaction(transactionID)) {
-            // Dequeue the next cohort entry waiting in the queue.
-            currentCohortEntry = queuedCohortEntries.poll();
-            if(currentCohortEntry != null) {
-                currentCohortEntry.updateLastAccessTime();
-                doCanCommit(currentCohortEntry);
+            currentCohortEntry = null;
+
+            log.debug("{}: currentTransactionComplete: {}", name, transactionID);
+
+            maybeProcessNextCohortEntry();
+        }
+    }
+
+    private void maybeProcessNextCohortEntry() {
+        // Check if there's a next cohort entry waiting in the queue and if it is ready to commit. Also
+        // clean out expired entries.
+        Iterator<CohortEntry> iter = queuedCohortEntries.iterator();
+        while(iter.hasNext()) {
+            CohortEntry next = iter.next();
+            if(next.isReadyToCommit()) {
+                if(currentCohortEntry == null) {
+                    if(log.isDebugEnabled()) {
+                        log.debug("{}: Next entry to canCommit {}", name, next);
+                    }
+
+                    iter.remove();
+                    currentCohortEntry = next;
+                    currentCohortEntry.updateLastAccessTime();
+                    doCanCommit(currentCohortEntry);
+                }
+
+                break;
+            } else if(next.isExpired(cacheExpiryTimeoutInMillis)) {
+                log.warn("{}: canCommit for transaction {} was not received within {} ms - entry removed from cache",
+                        name, next.getTransactionID(), cacheExpiryTimeoutInMillis);
+
+                iter.remove();
+                cohortCache.remove(next.getTransactionID());
+            } else {
+                break;
             }
         }
     }
 
+    void cleanupExpiredCohortEntries() {
+        maybeProcessNextCohortEntry();
+    }
+
     @VisibleForTesting
     void setCohortDecorator(CohortDecorator cohortDecorator) {
         this.cohortDecorator = cohortDecorator;
     }
 
-
     static class CohortEntry {
         private final String transactionID;
         private ShardDataTreeCohort cohort;
         private final ReadWriteShardDataTreeTransaction transaction;
         private ActorRef replySender;
         private Shard shard;
-        private long lastAccessTime;
         private boolean doImmediateCommit;
+        private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
 
         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
             this.transaction = Preconditions.checkNotNull(transaction);
@@ -463,11 +502,8 @@ public class ShardCommitCoordinator {
         }
 
         void updateLastAccessTime() {
-            lastAccessTime = System.currentTimeMillis();
-        }
-
-        long getLastAccessTime() {
-            return lastAccessTime;
+            lastAccessTimer.reset();
+            lastAccessTimer.start();
         }
 
         String getTransactionID() {
@@ -497,6 +533,14 @@ public class ShardCommitCoordinator {
             }
         }
 
+        boolean isReadyToCommit() {
+            return replySender != null;
+        }
+
+        boolean isExpired(long expireTimeInMillis) {
+            return lastAccessTimer.elapsed(TimeUnit.MILLISECONDS) >= expireTimeInMillis;
+        }
+
         boolean isDoImmediateCommit() {
             return doImmediateCommit;
         }
@@ -520,5 +564,13 @@ public class ShardCommitCoordinator {
         void setShard(Shard shard) {
             this.shard = shard;
         }
+
+        @Override
+        public String toString() {
+            StringBuilder builder = new StringBuilder();
+            builder.append("CohortEntry [transactionID=").append(transactionID).append(", doImmediateCommit=")
+                    .append(doImmediateCommit).append("]");
+            return builder.toString();
+        }
     }
 }
index 79ff2a4..261b93d 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.cluster.datastore.jmx.mbeans;
 
+import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.md.sal.common.util.jmx.AbstractMXBean;
 
@@ -73,6 +74,11 @@ public class DatastoreConfigurationMXBeanImpl extends AbstractMXBean implements
         return context.getShardTransactionCommitTimeoutInSeconds();
     }
 
+    @Override
+    public long getShardCommitQueueExpiryTimeoutInSeconds() {
+        return TimeUnit.SECONDS.convert(context.getShardCommitQueueExpiryTimeoutInMillis(), TimeUnit.MILLISECONDS);
+    }
+
     @Override
     public int getShardTransactionCommitQueueCapacity() {
         return context.getShardTransactionCommitQueueCapacity();
index 2527882..539797e 100644 (file)
@@ -64,6 +64,8 @@ public class DistributedConfigDataStoreProviderModule extends
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
                 .transactionCreationInitialRateLimit(props.getTransactionCreationInitialRateLimit().getValue())
                 .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
+                .shardCommitQueueExpiryTimeoutInSeconds(
+                        props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getConfigSchemaServiceDependency(),
index 0884565..45ed184 100644 (file)
@@ -65,6 +65,8 @@ public class DistributedOperationalDataStoreProviderModule extends
                 .shardElectionTimeoutFactor(props.getShardElectionTimeoutFactor().getValue())
                 .transactionCreationInitialRateLimit(props.getTransactionCreationInitialRateLimit().getValue())
                 .shardBatchedModificationCount(props.getShardBatchedModificationCount().getValue().intValue())
+                .shardCommitQueueExpiryTimeoutInSeconds(
+                        props.getShardCommitQueueExpiryTimeoutInSeconds().getValue().intValue())
                 .build();
 
         return DistributedDataStoreFactory.createInstance(getOperationalSchemaServiceDependency(),
index dc83af9..3c753d0 100644 (file)
@@ -136,11 +136,21 @@ module distributed-datastore-provider {
          }
 
          leaf shard-transaction-commit-queue-capacity {
-            default 20000;
+            default 50000;
             type non-zero-uint32-type;
             description "The maximum allowed capacity for each shard's transaction commit queue.";
          }
 
+         leaf shard-commit-queue-expiry-timeout-in-seconds {
+             default 120; // 2 minutes
+             type non-zero-uint32-type;
+             description "The maximum amount of time a transaction can remain in a shard's commit queue waiting 
+                 to begin the CanCommit phase as coordinated by the broker front-end. Normally this should be
+                 quick but latencies can occur in between transaction ready and CanCommit or a remote broker
+                 could lose connection and CanCommit might never occur. Expiring transactions from the queue
+                 allows subsequent pending transaction to be processed.";
+         }
+         
          leaf shard-initialization-timeout-in-seconds {
             default 300; // 5 minutes
             type non-zero-uint32-type;
index f3d93b8..76ae3c7 100644 (file)
@@ -9,6 +9,7 @@ import akka.cluster.Cluster;
 import akka.testkit.JavaTestKit;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
@@ -788,12 +789,15 @@ public class DistributedDataStoreIntegrationTest {
 
             writeTx = txChain.newWriteOnlyTransaction();
 
-            //writeTx.delete(personPath);
+            writeTx.delete(carPath);
 
             DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
 
-            doCommit(cohort1);
-            doCommit(cohort2);
+            ListenableFuture<Boolean> canCommit1 = cohort1.canCommit();
+            ListenableFuture<Boolean> canCommit2 = cohort2.canCommit();
+
+            doCommit(canCommit1, cohort1);
+            doCommit(canCommit2, cohort2);
             doCommit(cohort3);
 
             txChain.close();
@@ -801,12 +805,11 @@ public class DistributedDataStoreIntegrationTest {
             DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
 
             optional = readTx.read(carPath).get(5, TimeUnit.SECONDS);
-            assertEquals("isPresent", true, optional.isPresent());
-            assertEquals("Data node", car, optional.get());
+            assertEquals("isPresent", false, optional.isPresent());
 
             optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
-            //assertEquals("isPresent", false, optional.isPresent());
             assertEquals("isPresent", true, optional.isPresent());
+            assertEquals("Data node", person, optional.get());
 
             cleanup(dataStore);
         }};
index 7cef2fd..be8ede9 100644 (file)
@@ -33,6 +33,7 @@ import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
@@ -253,8 +254,8 @@ public class DistributedDataStoreRemotingIntegrationTest {
     }
 
     @Test
-    public void testTransactionChain() throws Exception {
-        initDatastores("testTransactionChain");
+    public void testTransactionChainWithSingleShard() throws Exception {
+        initDatastores("testTransactionChainWithSingleShard");
 
         DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
 
@@ -299,6 +300,61 @@ public class DistributedDataStoreRemotingIntegrationTest {
         verifyCars(followerDistributedDataStore.newReadOnlyTransaction(), car2);
     }
 
+    @Test
+    public void testTransactionChainWithMultipleShards() throws Exception{
+        initDatastores("testTransactionChainWithMultipleShards");
+
+        DOMStoreTransactionChain txChain = followerDistributedDataStore.createTransactionChain();
+
+        DOMStoreWriteTransaction writeTx = txChain.newWriteOnlyTransaction();
+        assertNotNull("newWriteOnlyTransaction returned null", writeTx);
+
+        writeTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+        writeTx.write(PeopleModel.BASE_PATH, PeopleModel.emptyContainer());
+
+        writeTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+        writeTx.write(PeopleModel.PERSON_LIST_PATH, PeopleModel.newPersonMapNode());
+
+        followerTestKit.doCommit(writeTx.ready());
+
+        DOMStoreReadWriteTransaction readWriteTx = txChain.newReadWriteTransaction();
+
+        MapEntryNode car = CarsModel.newCarEntry("optima", BigInteger.valueOf(20000));
+        YangInstanceIdentifier carPath = CarsModel.newCarPath("optima");
+        readWriteTx.write(carPath, car);
+
+        MapEntryNode person = PeopleModel.newPersonEntry("jack");
+        YangInstanceIdentifier personPath = PeopleModel.newPersonPath("jack");
+        readWriteTx.merge(personPath, person);
+
+        Optional<NormalizedNode<?, ?>> optional = readWriteTx.read(carPath).get(5, TimeUnit.SECONDS);
+        assertEquals("isPresent", true, optional.isPresent());
+        assertEquals("Data node", car, optional.get());
+
+        optional = readWriteTx.read(personPath).get(5, TimeUnit.SECONDS);
+        assertEquals("isPresent", true, optional.isPresent());
+        assertEquals("Data node", person, optional.get());
+
+        DOMStoreThreePhaseCommitCohort cohort2 = readWriteTx.ready();
+
+        writeTx = txChain.newWriteOnlyTransaction();
+
+        writeTx.delete(personPath);
+
+        DOMStoreThreePhaseCommitCohort cohort3 = writeTx.ready();
+
+        followerTestKit.doCommit(cohort2);
+        followerTestKit.doCommit(cohort3);
+
+        txChain.close();
+
+        DOMStoreReadTransaction readTx = followerDistributedDataStore.newReadOnlyTransaction();
+        verifyCars(readTx, car);
+
+        optional = readTx.read(personPath).get(5, TimeUnit.SECONDS);
+        assertEquals("isPresent", false, optional.isPresent());
+    }
+
     @Test
     public void testReadyLocalTransactionForwardedToLeader() throws Exception {
         initDatastores("testReadyLocalTransactionForwardedToLeader");
index 109e77c..94f2085 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import com.google.common.base.Optional;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -122,6 +123,13 @@ class IntegrationTestKit extends ShardTestKit {
         cohort.commit().get(5, TimeUnit.SECONDS);
     }
 
+    void doCommit(final ListenableFuture<Boolean> canCommitFuture, final DOMStoreThreePhaseCommitCohort cohort) throws Exception {
+        Boolean canCommit = canCommitFuture.get(7, TimeUnit.SECONDS);
+        assertEquals("canCommit", true, canCommit);
+        cohort.preCommit().get(5, TimeUnit.SECONDS);
+        cohort.commit().get(5, TimeUnit.SECONDS);
+    }
+
     void cleanup(DistributedDataStore dataStore) {
         if(dataStore != null) {
             dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), null);
index ee54316..49cfcbc 100644 (file)
@@ -1826,7 +1826,7 @@ public class ShardTest extends AbstractShardTest {
 
     @Test
     public void testTransactionCommitQueueCapacityExceeded() throws Throwable {
-        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(1);
+        dataStoreContextBuilder.shardTransactionCommitQueueCapacity(2);
 
         new ShardTestKit(getSystem()) {{
             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
@@ -1866,9 +1866,11 @@ public class ShardTest extends AbstractShardTest {
                     cohort2, modification2, true, false), getRef());
             expectMsgClass(duration, ReadyTransactionReply.class);
 
+            // The 3rd Tx should exceed queue capacity and fail.
+
             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
                     cohort3, modification3, true, false), getRef());
-            expectMsgClass(duration, ReadyTransactionReply.class);
+            expectMsgClass(duration, akka.actor.Status.Failure.class);
 
             // canCommit 1st Tx.
 
@@ -1888,6 +1890,125 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test
+    public void testTransactionCommitWithPriorExpiredCohortEntries() throws Throwable {
+        dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
+
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testTransactionCommitWithPriorExpiredCohortEntries");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            String transactionID3 = "tx3";
+            MutableCompositeModification modification3 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
+                    TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME), modification3);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
+                    cohort3, modification3, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            // All Tx's are readied. We'll send canCommit for the last one but not the others. The others
+            // should expire from the queue and the last one should be processed.
+
+            shard.tell(new CanCommitTransaction(transactionID3).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testTransactionCommitWithSubsequentExpiredCohortEntry() throws Throwable {
+        dataStoreContextBuilder.shardCommitQueueExpiryTimeoutInMillis(1300).shardTransactionCommitTimeoutInSeconds(1);
+
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testTransactionCommitWithSubsequentExpiredCohortEntry");
+
+            waitUntilLeader(shard);
+
+            final FiniteDuration duration = duration("5 seconds");
+
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+            String transactionID1 = "tx1";
+            MutableCompositeModification modification1 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
+                    cohort1, modification1, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            // CanCommit the first one so it's the current in-progress CohortEntry.
+
+            shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Ready the second Tx.
+
+            String transactionID2 = "tx2";
+            MutableCompositeModification modification2 = new MutableCompositeModification();
+            ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
+                    TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification2);
+
+            shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
+                    cohort2, modification2, true, false), getRef());
+            expectMsgClass(duration, ReadyTransactionReply.class);
+
+            // Ready the third Tx.
+
+            String transactionID3 = "tx3";
+            DataTreeModification modification3 = dataStore.getDataTree().takeSnapshot().newModification();
+            new WriteModification(TestModel.TEST2_PATH, ImmutableNodes.containerNode(TestModel.TEST2_QNAME))
+                    .apply(modification3);
+            ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(transactionID3, modification3, true);
+
+            shard.tell(readyMessage, getRef());
+
+            // Commit the first Tx. After completing, the second should expire from the queue and the third
+            // Tx committed.
+
+            shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            // Expect commit reply from the third Tx.
+
+            expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            NormalizedNode<?, ?> node = readStore(shard, TestModel.TEST2_PATH);
+            assertNotNull(TestModel.TEST2_PATH + " not found", node);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
     @Test
     public void testCanCommitBeforeReadyFailure() throws Throwable {
         new ShardTestKit(getSystem()) {{
index 60420dc..07e9b29 100644 (file)
@@ -22,6 +22,9 @@ public class TestModel {
     public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
             "test");
 
+    public static final QName TEST2_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13",
+            "test2");
+
     public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13",
             "junk");
 
@@ -36,6 +39,7 @@ public class TestModel {
     private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang";
 
     public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME);
+    public static final YangInstanceIdentifier TEST2_PATH = YangInstanceIdentifier.of(TEST2_QNAME);
     public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME);
     public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH).
             node(OUTER_LIST_QNAME).build();

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.