} else if (BatchedModifications.class.isInstance(message)) {
handleBatchedModifications((BatchedModifications)message);
} else if (message instanceof ForwardedReadyTransaction) {
- commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
- getSender(), this);
+ handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
} else if (message instanceof ReadyLocalTransaction) {
handleReadyLocalTransaction((ReadyLocalTransaction)message);
} else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
}
- private void noLeaderError(String errMessage, Object message) {
+ private void noLeaderError(String errMessage) {
// TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
// it more resilient in case we're in the process of electing a new leader.
getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
leader.forward(batched, getContext());
} else {
- noLeaderError("Could not commit transaction " + batched.getTransactionID(), batched);
+ noLeaderError("Could not commit transaction " + batched.getTransactionID());
}
}
}
message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(message, getContext());
} else {
- noLeaderError("Could not commit transaction " + message.getTransactionID(), message);
+ noLeaderError("Could not commit transaction " + message.getTransactionID());
+ }
+ }
+ }
+
+ private void handleForwardedReadyTransaction(ForwardedReadyTransaction forwardedReady) {
+ if (isLeader()) {
+ failIfIsolatedLeader(getSender());
+
+ commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
+ } else {
+ ActorSelection leader = getLeader();
+ if (leader != null) {
+ LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
+
+ ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionID(),
+ forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit());
+ readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
+ leader.forward(readyLocal, getContext());
+ } else {
+ noLeaderError("Could not commit transaction " + forwardedReady.getTransactionID());
}
}
}