cohortEntry.applyModifications(batched.getModifications());
if(batched.isReady()) {
+ if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
+ throw new IllegalStateException(String.format(
+ "The total number of batched messages received %d does not match the number sent %d",
+ cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
+ }
+
if(!queueCohortEntry(cohortEntry, sender, shard)) {
return;
}
private Shard shard;
private boolean doImmediateCommit;
private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
+ private int totalBatchedModificationsReceived;
CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
this.transaction = Preconditions.checkNotNull(transaction);
return cohort;
}
+ int getTotalBatchedModificationsReceived() {
+ return totalBatchedModificationsReceived;
+ }
+
void applyModifications(Iterable<Modification> modifications) {
for (Modification modification : modifications) {
modification.apply(transaction.getSnapshot());
}
+
+ totalBatchedModificationsReceived++;
}
void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {