X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=53f27061ae527404730d97224ae15651ea803b97;hp=1b838ae0e6c6e3c32ae604846d32ae280e32bc78;hb=340a2d4c979ac6f8d5adff8bd9e1c9f724e7a164;hpb=c04117c66b63366a2f402a06d20f9980bb7283cb diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 1b838ae0e6..53f27061ae 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -184,6 +184,18 @@ class ShardCommitCoordinator { cohortEntry.applyModifications(batched.getModifications()); if(batched.isReady()) { + if(cohortEntry.getLastBatchedModificationsException() != null) { + cohortCache.remove(cohortEntry.getTransactionID()); + throw cohortEntry.getLastBatchedModificationsException(); + } + + if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) { + cohortCache.remove(cohortEntry.getTransactionID()); + throw new IllegalStateException(String.format( + "The total number of batched messages received %d does not match the number sent %d", + cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent())); + } + if(!queueCohortEntry(cohortEntry, sender, shard)) { return; } @@ -216,7 +228,8 @@ class ShardCommitCoordinator { * @param shard the transaction's shard actor */ void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) { - final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification()); + final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification(), + message.getTransactionID()); final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort); cohortCache.put(message.getTransactionID(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); @@ -488,10 +501,12 @@ class ShardCommitCoordinator { private final String transactionID; private ShardDataTreeCohort cohort; private final ReadWriteShardDataTreeTransaction transaction; + private RuntimeException lastBatchedModificationsException; private ActorRef replySender; private Shard shard; private boolean doImmediateCommit; private final Stopwatch lastAccessTimer = Stopwatch.createStarted(); + private int totalBatchedModificationsReceived; CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { this.transaction = Preconditions.checkNotNull(transaction); @@ -524,9 +539,25 @@ class ShardCommitCoordinator { return cohort; } + int getTotalBatchedModificationsReceived() { + return totalBatchedModificationsReceived; + } + + RuntimeException getLastBatchedModificationsException() { + return lastBatchedModificationsException; + } + void applyModifications(Iterable modifications) { - for (Modification modification : modifications) { - modification.apply(transaction.getSnapshot()); + totalBatchedModificationsReceived++; + if(lastBatchedModificationsException == null) { + for (Modification modification : modifications) { + try { + modification.apply(transaction.getSnapshot()); + } catch (RuntimeException e) { + lastBatchedModificationsException = e; + throw e; + } + } } }