Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
+
+
}
private void setTransactionCommitTimeout() {
try {
commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
- LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(),
+ LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionID(), e);
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
} else {
ActorSelection leader = getLeader();
if (leader != null) {
- LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader);
+ LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
+ message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
leader.forward(message, getContext());
} else {
noLeaderError(message);
}
@Override
- protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ @VisibleForTesting
+ public RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
return snapshotCohort;
}
@Override
@Nonnull
protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
- return new ShardRecoveryCoordinator(store, persistenceId(), LOG);
+ return new ShardRecoveryCoordinator(store, store.getSchemaContext(), persistenceId(), LOG);
}
@Override
protected void onRecoveryComplete() {
- store.recoveryDone();
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());