Bug 3195: Cleanup on error paths and error handling
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 6821fa721fe8b30629106f1d14ba666a775d9da6..53f27061ae527404730d97224ae15651ea803b97 100644 (file)
@@ -184,6 +184,18 @@ class ShardCommitCoordinator {
         cohortEntry.applyModifications(batched.getModifications());
 
         if(batched.isReady()) {
+            if(cohortEntry.getLastBatchedModificationsException() != null) {
+                cohortCache.remove(cohortEntry.getTransactionID());
+                throw cohortEntry.getLastBatchedModificationsException();
+            }
+
+            if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
+                cohortCache.remove(cohortEntry.getTransactionID());
+                throw new IllegalStateException(String.format(
+                        "The total number of batched messages received %d does not match the number sent %d",
+                        cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
+            }
+
             if(!queueCohortEntry(cohortEntry, sender, shard)) {
                 return;
             }
@@ -489,10 +501,12 @@ class ShardCommitCoordinator {
         private final String transactionID;
         private ShardDataTreeCohort cohort;
         private final ReadWriteShardDataTreeTransaction transaction;
+        private RuntimeException lastBatchedModificationsException;
         private ActorRef replySender;
         private Shard shard;
         private boolean doImmediateCommit;
         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
+        private int totalBatchedModificationsReceived;
 
         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
             this.transaction = Preconditions.checkNotNull(transaction);
@@ -525,9 +539,25 @@ class ShardCommitCoordinator {
             return cohort;
         }
 
+        int getTotalBatchedModificationsReceived() {
+            return totalBatchedModificationsReceived;
+        }
+
+        RuntimeException getLastBatchedModificationsException() {
+            return lastBatchedModificationsException;
+        }
+
         void applyModifications(Iterable<Modification> modifications) {
-            for (Modification modification : modifications) {
-                modification.apply(transaction.getSnapshot());
+            totalBatchedModificationsReceived++;
+            if(lastBatchedModificationsException == null) {
+                for (Modification modification : modifications) {
+                        try {
+                            modification.apply(transaction.getSnapshot());
+                        } catch (RuntimeException e) {
+                            lastBatchedModificationsException = e;
+                            throw e;
+                        }
+                }
             }
         }