Bug 3206: CDS - issues with direct commit
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index c2cc1ac05b5dd0bb51a363b66d27d5398042f401..6da9fe947a0651e5d96981b87a274bde814f902a 100644 (file)
@@ -140,7 +140,7 @@ public class Shard extends RaftActor {
         }
 
         commitCoordinator = new ShardCommitCoordinator(store,
-                TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+                datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
 
         setTransactionCommitTimeout();
@@ -160,7 +160,7 @@ public class Shard extends RaftActor {
 
     private void setTransactionCommitTimeout() {
         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
-                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
     }
 
     public static Props props(final ShardIdentifier name,
@@ -302,14 +302,15 @@ public class Shard extends RaftActor {
     private void handleTransactionCommitTimeoutCheck() {
         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
         if(cohortEntry != null) {
-            long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
-            if(elapsed > transactionCommitTimeout) {
+            if(cohortEntry.isExpired(transactionCommitTimeout)) {
                 LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
                         persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
 
                 doAbortTransaction(cohortEntry.getTransactionID(), null);
             }
         }
+
+        commitCoordinator.cleanupExpiredCohortEntries();
     }
 
     private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
@@ -323,9 +324,9 @@ public class Shard extends RaftActor {
         // or if cohortEntry has no modifications
         // we can apply modification to the state immediately
         if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
-            applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
+            applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
         } else {
-            Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
+            Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
                 DataTreeCandidatePayload.create(candidate));
         }
     }