cohortEntry.applyModifications(batched.getModifications());
if(batched.isReady()) {
+ if(cohortEntry.getLastBatchedModificationsException() != null) {
+ cohortCache.remove(cohortEntry.getTransactionID());
+ throw cohortEntry.getLastBatchedModificationsException();
+ }
+
+ if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
+ cohortCache.remove(cohortEntry.getTransactionID());
+ throw new IllegalStateException(String.format(
+ "The total number of batched messages received %d does not match the number sent %d",
+ cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
+ }
+
if(!queueCohortEntry(cohortEntry, sender, shard)) {
return;
}
private final String transactionID;
private ShardDataTreeCohort cohort;
private final ReadWriteShardDataTreeTransaction transaction;
+ private RuntimeException lastBatchedModificationsException;
private ActorRef replySender;
private Shard shard;
private boolean doImmediateCommit;
private final Stopwatch lastAccessTimer = Stopwatch.createStarted();
+ private int totalBatchedModificationsReceived;
CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) {
this.transaction = Preconditions.checkNotNull(transaction);
return cohort;
}
+ int getTotalBatchedModificationsReceived() {
+ return totalBatchedModificationsReceived;
+ }
+
+ RuntimeException getLastBatchedModificationsException() {
+ return lastBatchedModificationsException;
+ }
+
void applyModifications(Iterable<Modification> modifications) {
- for (Modification modification : modifications) {
- modification.apply(transaction.getSnapshot());
+ totalBatchedModificationsReceived++;
+ if(lastBatchedModificationsException == null) {
+ for (Modification modification : modifications) {
+ try {
+ modification.apply(transaction.getSnapshot());
+ } catch (RuntimeException e) {
+ lastBatchedModificationsException = e;
+ throw e;
+ }
+ }
}
}