X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=7d806890448ff783be319bcb66c3086f3d8f2e00;hb=refs%2Fchanges%2F00%2F21500%2F3;hp=6821fa721fe8b30629106f1d14ba666a775d9da6;hpb=aa6342c2982aa30ffbcca6d1212de164041d9477;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 6821fa721f..7d80689044 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -184,6 +184,12 @@ class ShardCommitCoordinator { cohortEntry.applyModifications(batched.getModifications()); if(batched.isReady()) { + if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) { + throw new IllegalStateException(String.format( + "The total number of batched messages received %d does not match the number sent %d", + cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent())); + } + if(!queueCohortEntry(cohortEntry, sender, shard)) { return; } @@ -493,6 +499,7 @@ class ShardCommitCoordinator { private Shard shard; private boolean doImmediateCommit; private final Stopwatch lastAccessTimer = Stopwatch.createStarted(); + private int totalBatchedModificationsReceived; CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { this.transaction = Preconditions.checkNotNull(transaction); @@ -525,10 +532,16 @@ class ShardCommitCoordinator { return cohort; } + int getTotalBatchedModificationsReceived() { + return totalBatchedModificationsReceived; + } + void applyModifications(Iterable modifications) { for (Modification modification : modifications) { modification.apply(transaction.getSnapshot()); } + + totalBatchedModificationsReceived++; } void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {