try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
- LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(),
+ LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionID(), e);
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
} else {
ActorSelection leader = getLeader();
if (leader != null) {
- LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader);
+ LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
+ message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(message, getContext());
} else {
noLeaderError(message);
}
@Override
- protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ @VisibleForTesting
+ public RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
return snapshotCohort;
}