cohortEntry.applyModifications(batched.getModifications());
if(batched.isReady()) {
+ if(cohortEntry.getLastBatchedModificationsException() != null) {
+ cohortCache.remove(cohortEntry.getTransactionID());
+ throw cohortEntry.getLastBatchedModificationsException();
+ }
+
if(cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
+ cohortCache.remove(cohortEntry.getTransactionID());
throw new IllegalStateException(String.format(
"The total number of batched messages received %d does not match the number sent %d",
cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
private final String transactionID;
private ShardDataTreeCohort cohort;
private final ReadWriteShardDataTreeTransaction transaction;
+ private RuntimeException lastBatchedModificationsException;
private ActorRef replySender;
private Shard shard;
private boolean doImmediateCommit;
return totalBatchedModificationsReceived;
}
- void applyModifications(Iterable<Modification> modifications) {
- for (Modification modification : modifications) {
- modification.apply(transaction.getSnapshot());
- }
+ RuntimeException getLastBatchedModificationsException() {
+ return lastBatchedModificationsException;
+ }
+ void applyModifications(Iterable<Modification> modifications) {
totalBatchedModificationsReceived++;
+ if(lastBatchedModificationsException == null) {
+ for (Modification modification : modifications) {
+ try {
+ modification.apply(transaction.getSnapshot());
+ } catch (RuntimeException e) {
+ lastBatchedModificationsException = e;
+ throw e;
+ }
+ }
+ }
}
void ready(CohortDecorator cohortDecorator, boolean doImmediateCommit) {