- // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
- // it more resilient in case we're in the process of electing a new leader.
- getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
- "Could not find the leader for shard %s. This typically happens" +
- " when the system is coming up or recovering and a leader is being elected. Try again" +
- " later.", persistenceId()))), getSelf());
+ noLeaderError(batched);
+ }
+ }
+ }
+
+ private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
+ if (isLeader()) {
+ try {
+ commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+ } catch (Exception e) {
+ LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(),
+ message.getTransactionID(), e);
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ } else {
+ ActorSelection leader = getLeader();
+ if (leader != null) {
+ LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader);
+ leader.forward(message, getContext());
+ } else {
+ noLeaderError(message);