BUG 3019 : Fix Operation throttling for modification batching scenarios
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 14a20da247f5138b17e3c30e437bf3c661e5ec25..0b4abe98a10a9ecac5be58a036f0653bcece5e8c 100644 (file)
@@ -84,6 +84,9 @@ public class Shard extends RaftActor {
 
     private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
 
+    @VisibleForTesting
+    static final Object GET_SHARD_MBEAN_MESSAGE = "getShardMBeanMessage";
+
     @VisibleForTesting
     static final String DEFAULT_NAME = "default";
 
@@ -137,7 +140,7 @@ public class Shard extends RaftActor {
         }
 
         commitCoordinator = new ShardCommitCoordinator(store,
-                TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+                datastoreContext.getShardCommitQueueExpiryTimeoutInMillis(),
                 datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
 
         setTransactionCommitTimeout();
@@ -157,7 +160,7 @@ public class Shard extends RaftActor {
 
     private void setTransactionCommitTimeout() {
         transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
-                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+                datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS) / 2;
     }
 
     public static Props props(final ShardIdentifier name,
@@ -256,9 +259,11 @@ public class Shard extends RaftActor {
                 onDatastoreContext((DatastoreContext)message);
             } else if(message instanceof RegisterRoleChangeListener){
                 roleChangeNotifier.get().forward(message, context());
-            } else if (message instanceof FollowerInitialSyncUpStatus){
+            } else if (message instanceof FollowerInitialSyncUpStatus) {
                 shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
                 context().parent().tell(message, self());
+            } else if(GET_SHARD_MBEAN_MESSAGE.equals(message)){
+                sender().tell(getShardMBean(), self());
             } else {
                 super.onReceiveCommand(message);
             }
@@ -273,9 +278,10 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) {
+    protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId, short leaderPayloadVersion) {
         return new ShardLeaderStateChanged(memberId, leaderId,
-                isLeader() ? Optional.<DataTree>of(store.getDataTree()) : Optional.<DataTree>absent());
+                isLeader() ? Optional.<DataTree>of(store.getDataTree()) : Optional.<DataTree>absent(),
+                leaderPayloadVersion);
     }
 
     private void onDatastoreContext(DatastoreContext context) {
@@ -297,14 +303,15 @@ public class Shard extends RaftActor {
     private void handleTransactionCommitTimeoutCheck() {
         CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
         if(cohortEntry != null) {
-            long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
-            if(elapsed > transactionCommitTimeout) {
+            if(cohortEntry.isExpired(transactionCommitTimeout)) {
                 LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
                         persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
 
                 doAbortTransaction(cohortEntry.getTransactionID(), null);
             }
         }
+
+        commitCoordinator.cleanupExpiredCohortEntries();
     }
 
     private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
@@ -318,9 +325,9 @@ public class Shard extends RaftActor {
         // or if cohortEntry has no modifications
         // we can apply modification to the state immediately
         if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
-            applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
+            applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
         } else {
-            Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
+            Shard.this.persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
                 DataTreeCandidatePayload.create(candidate));
         }
     }
@@ -407,7 +414,7 @@ public class Shard extends RaftActor {
     }
 
     private void handleBatchedModifications(BatchedModifications batched) {
-        // This message is sent to prepare the modificationsa transaction directly on the Shard as an
+        // This message is sent to prepare the modifications transaction directly on the Shard as an
         // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
         // BatchedModifications message, the caller sets the ready flag in the message indicating
         // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
@@ -446,14 +453,15 @@ public class Shard extends RaftActor {
             try {
                 commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
             } catch (Exception e) {
-                LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(),
+                LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
                         message.getTransactionID(), e);
                 getSender().tell(new akka.actor.Status.Failure(e), getSelf());
             }
         } else {
             ActorSelection leader = getLeader();
             if (leader != null) {
-                LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader);
+                LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
+                message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
                 leader.forward(message, getContext());
             } else {
                 noLeaderError(message);
@@ -582,7 +590,8 @@ public class Shard extends RaftActor {
     }
 
     @Override
-    protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+    @VisibleForTesting
+    public RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
         return snapshotCohort;
     }
 
@@ -594,6 +603,7 @@ public class Shard extends RaftActor {
 
     @Override
     protected void onRecoveryComplete() {
+        store.recoveryDone();
         //notify shard manager
         getContext().parent().tell(new ActorInitialized(), getSelf());