+ private void handleBatchedModifications(BatchedModifications batched) {
+ // This message is sent to prepare the modificationsa transaction directly on the Shard as an
+ // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
+ // BatchedModifications message, the caller sets the ready flag in the message indicating
+ // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
+ // to initiate the 3-phase commit. This also avoids the overhead of sending an additional
+ // ReadyTransaction message.
+
+ // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't
+ // normally get here if we're not the leader as the front-end (TransactionProxy) should determine
+ // the primary/leader shard. However with timing and caching on the front-end, there's a small
+ // window where it could have a stale leader during leadership transitions.
+ //
+ if(isLeader()) {
+ try {
+ BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
+ sender().tell(reply, self());
+ } catch (Exception e) {
+ LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+ batched.getTransactionID(), e);
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ } else {
+ ActorSelection leader = getLeader();
+ if(leader != null) {
+ // TODO: what if this is not the first batch and leadership changed in between batched messages?
+ // We could check if the commitCoordinator already has a cached entry and forward all the previous
+ // batched modifications.
+ LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
+ leader.forward(batched, getContext());
+ } else {
+ // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+ // it more resilient in case we're in the process of electing a new leader.
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find the leader for shard %s. This typically happens" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
+ " later.", persistenceId()))), getSelf());
+ }
+ }
+ }
+