Check total batched messages sent in ShardCommitCoordinator on tx ready
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 6821fa721fe8b30629106f1d14ba666a775d9da6..7d806890448ff783be319bcb66c3086f3d8f2e00 100644 (file)
@@ -184,6 +184,12 @@ class ShardCommitCoordinator {
         cohortEntry.applyModifications(batched.getModifications());
 
         if(batched.isReady()) {
+            if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
+                throw new IllegalStateException(String.format(
+                        "The total number of batched messages received %d does not match the number sent %d",
+                        cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
+            }
+
             if(!queueCohortEntry(cohortEntry, sender, shard)) {
                 return;
             }
@@ -493,6 +499,7 @@ class ShardCommitCoordinator {
         private Shard shard;
         private boolean doImmediateCommit;
         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
+        private int totalBatchedModificationsReceived;
 
         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
             this.transaction = Preconditions.checkNotNull(transaction);
@@ -525,10 +532,16 @@ class ShardCommitCoordinator {
             return cohort;
         }
 
+        int getTotalBatchedModificationsReceived() {
+            return totalBatchedModificationsReceived;
+        }
+
         void applyModifications(Iterable<Modification> modifications) {
             for (Modification modification : modifications) {
                 modification.apply(transaction.getSnapshot());
             }
+
+            totalBatchedModificationsReceived++;
         }
 
         void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {