Bug 3195: Cleanup on error paths and error handling
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 1b838ae0e6c6e3c32ae604846d32ae280e32bc78..53f27061ae527404730d97224ae15651ea803b97 100644 (file)
@@ -184,6 +184,18 @@ class ShardCommitCoordinator {
         cohortEntry.applyModifications(batched.getModifications());
 
         if(batched.isReady()) {
         cohortEntry.applyModifications(batched.getModifications());
 
         if(batched.isReady()) {
+            if(cohortEntry.getLastBatchedModificationsException() != null) {
+                cohortCache.remove(cohortEntry.getTransactionID());
+                throw cohortEntry.getLastBatchedModificationsException();
+            }
+
+            if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
+                cohortCache.remove(cohortEntry.getTransactionID());
+                throw new IllegalStateException(String.format(
+                        "The total number of batched messages received %d does not match the number sent %d",
+                        cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
+            }
+
             if(!queueCohortEntry(cohortEntry, sender, shard)) {
                 return;
             }
             if(!queueCohortEntry(cohortEntry, sender, shard)) {
                 return;
             }
@@ -216,7 +228,8 @@ class ShardCommitCoordinator {
      * @param shard the transaction's shard actor
      */
     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
      * @param shard the transaction's shard actor
      */
     void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
-        final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification());
+        final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(),
+                message.getTransactionID());
         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
         cohortCache.put(message.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
         final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
         cohortCache.put(message.getTransactionID(), cohortEntry);
         cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
@@ -488,10 +501,12 @@ class ShardCommitCoordinator {
         private final String transactionID;
         private ShardDataTreeCohort cohort;
         private final ReadWriteShardDataTreeTransaction transaction;
         private final String transactionID;
         private ShardDataTreeCohort cohort;
         private final ReadWriteShardDataTreeTransaction transaction;
+        private RuntimeException lastBatchedModificationsException;
         private ActorRef replySender;
         private Shard shard;
         private boolean doImmediateCommit;
         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
         private ActorRef replySender;
         private Shard shard;
         private boolean doImmediateCommit;
         private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
+        private int totalBatchedModificationsReceived;
 
         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
             this.transaction = Preconditions.checkNotNull(transaction);
 
         CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
             this.transaction = Preconditions.checkNotNull(transaction);
@@ -524,9 +539,25 @@ class ShardCommitCoordinator {
             return cohort;
         }
 
             return cohort;
         }
 
+        int getTotalBatchedModificationsReceived() {
+            return totalBatchedModificationsReceived;
+        }
+
+        RuntimeException getLastBatchedModificationsException() {
+            return lastBatchedModificationsException;
+        }
+
         void applyModifications(Iterable<Modification> modifications) {
         void applyModifications(Iterable<Modification> modifications) {
-            for (Modification modification : modifications) {
-                modification.apply(transaction.getSnapshot());
+            totalBatchedModificationsReceived++;
+            if(lastBatchedModificationsException == null) {
+                for (Modification modification : modifications) {
+                        try {
+                            modification.apply(transaction.getSnapshot());
+                        } catch (RuntimeException e) {
+                            lastBatchedModificationsException = e;
+                            throw e;
+                        }
+                }
             }
         }
 
             }
         }