+ @SuppressWarnings("checkstyle:IllegalCatch")
+ private void batchedModifications(final BatchedModifications batched) {
+ if (checkClosed()) {
+ if (batched.isReady()) {
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ }
+ return;
+ }
+
+ try {
+ for (Modification modification: batched.getModifications()) {
+ modification.apply(transaction.getSnapshot());
+ }
+
+ totalBatchedModificationsReceived++;
+ if (batched.isReady()) {
+ if (lastBatchedModificationsException != null) {
+ throw lastBatchedModificationsException;
+ }
+
+ if (totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
+ throw new IllegalStateException(String.format(
+ "The total number of batched messages received %d does not match the number sent %d",
+ totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
+ }
+
+ readyTransaction(batched);
+ } else {
+ getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
+ }
+ } catch (Exception e) {
+ lastBatchedModificationsException = e;
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+ if (batched.isReady()) {
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+ }
+
+ protected final void dataExists(final DataExists message) {
+ super.dataExists(transaction, message);
+ }
+
+ protected final void readData(final ReadData message) {
+ super.readData(transaction, message);
+ }
+
+ private boolean checkClosed() {
+ final boolean ret = transaction.isClosed();
+ if (ret) {
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(
+ "Transaction is closed, no modifications allowed")), getSelf());
+ }
+ return ret;
+ }
+
+ private void readyTransaction(final BatchedModifications batched) {
+ TransactionIdentifier transactionID = getTransactionId();
+
+ LOG.debug("readyTransaction : {}", transactionID);
+
+ getShardActor().forward(new ForwardedReadyTransaction(transactionID, batched.getVersion(),
+ transaction, batched.isDoCommitOnReady(), batched.getParticipatingShardNames()), getContext());
+
+ // The shard will handle the commit from here so we're no longer needed - self-destruct.
+ getSelf().tell(PoisonPill.getInstance(), getSelf());