// - RAFT instance: shard name + datastore type
// - cluster member name
// We should call this 'memberId' or something similar and propagate that naming
- protected final @NonNull String getId() {
+ public final @NonNull String getId() {
return context.getId();
}
@Override
+ @Deprecated(since = "11.0.0", forRemoval = true)
public final @NonNull String persistenceId() {
return getId();
}
@Override
public void preStart() throws Exception {
- LOG.info("Starting recovery for {} with journal batch size {}", persistenceId(),
- context.getConfigParams().getJournalRecoveryLogBatchSize());
+ LOG.info("Starting recovery for {} with journal batch size {}", getId(),
+ context.getConfigParams().getJournalRecoveryLogBatchSize());
super.preStart();
possiblyHandleBehaviorMessage(message);
} else if (message instanceof ApplyJournalEntries applyEntries) {
- LOG.debug("{}: Persisting ApplyJournalEntries with index={}", persistenceId(), applyEntries.getToIndex());
-
+ LOG.debug("{}: Persisting ApplyJournalEntries with index={}", getId(), applyEntries.getToIndex());
persistence().persistAsync(applyEntries, NoopProcedure.instance());
} else if (message instanceof FindLeader) {
getSender().tell(new FindLeaderReply(getLeaderAddress()), self());
} else if (!possiblyHandleBehaviorMessage(message)) {
if (message instanceof JournalProtocol.Response response
&& delegatingPersistenceProvider.handleJournalResponse(response)) {
- LOG.debug("{}: handled a journal response", persistenceId());
+ LOG.debug("{}: handled a journal response", getId());
} else if (message instanceof SnapshotProtocol.Response response
&& delegatingPersistenceProvider.handleSnapshotResponse(response)) {
- LOG.debug("{}: handled a snapshot response", persistenceId());
+ LOG.debug("{}: handled a snapshot response", getId());
} else {
handleNonRaftCommand(message);
}
}
private void onRequestLeadership(final RequestLeadership message) {
- LOG.debug("{}: onRequestLeadership {}", persistenceId(), message);
+ LOG.debug("{}: onRequestLeadership {}", getId(), message);
+ final var requestedFollowerId = message.getRequestedFollowerId();
+
if (!isLeader()) {
// non-leader cannot satisfy leadership request
LOG.warn("{}: onRequestLeadership {} was sent to non-leader."
+ " Current behavior: {}. Sending failure response",
- persistenceId(), message, getCurrentBehavior().state());
+ getId(), message, getCurrentBehavior().state());
message.getReplyTo().tell(new LeadershipTransferFailedException("Cannot transfer leader to "
- + message.getRequestedFollowerId()
- + ". RequestLeadership message was sent to non-leader " + persistenceId()), self());
+ + requestedFollowerId
+ + ". RequestLeadership message was sent to non-leader " + getId()), self());
return;
}
- final String requestedFollowerId = message.getRequestedFollowerId();
final ActorRef replyTo = message.getReplyTo();
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
onFailure(raftActorRef);
}
- LOG.debug("{}: Leadership transferred successfully to {}", persistenceId(), requestedFollowerId);
+ LOG.debug("{}: Leadership transferred successfully to {}", getId(), requestedFollowerId);
replyTo.tell(new Status.Success(null), self());
}
@Override
public void onFailure(final ActorRef raftActorRef) {
- LOG.debug("{}: LeadershipTransfer request from {} failed", persistenceId(), requestedFollowerId);
+ LOG.debug("{}: LeadershipTransfer request from {} failed", getId(), requestedFollowerId);
replyTo.tell(new Status.Failure(
new LeadershipTransferFailedException(
"Failed to transfer leadership to " + requestedFollowerId
+ ". Follower is not ready to become leader")),
self());
}
- }, message.getRequestedFollowerId(), RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
+ }, requestedFollowerId, RaftActorLeadershipTransferCohort.USE_DEFAULT_LEADER_TIMEOUT);
}
private boolean possiblyHandleBehaviorMessage(final Object message) {
private void initiateLeadershipTransfer(final RaftActorLeadershipTransferCohort.OnComplete onComplete,
final @Nullable String followerId, final long newLeaderTimeoutInMillis) {
- LOG.debug("{}: Initiating leader transfer", persistenceId());
+ LOG.debug("{}: Initiating leader transfer", getId());
RaftActorLeadershipTransferCohort leadershipTransferInProgress = context.getRaftActorLeadershipTransferCohort();
if (leadershipTransferInProgress == null) {
leadershipTransferInProgress.init();
} else {
- LOG.debug("{}: prior leader transfer in progress - adding callback", persistenceId());
+ LOG.debug("{}: prior leader transfer in progress - adding callback", getId());
leadershipTransferInProgress.addOnComplete(onComplete);
}
}
private void onShutDown() {
- LOG.debug("{}: onShutDown", persistenceId());
+ LOG.debug("{}: onShutDown", getId());
if (shuttingDown) {
return;
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
public void onSuccess(final ActorRef raftActorRef) {
- LOG.debug("{}: leader transfer succeeded - sending PoisonPill", persistenceId());
+ LOG.debug("{}: leader transfer succeeded - sending PoisonPill", getId());
raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
}
@Override
public void onFailure(final ActorRef raftActorRef) {
- LOG.debug("{}: leader transfer failed - sending PoisonPill", persistenceId());
+ LOG.debug("{}: leader transfer failed - sending PoisonPill", getId());
raftActorRef.tell(PoisonPill.getInstance(), raftActorRef);
}
}, null, TimeUnit.MILLISECONDS.convert(2, TimeUnit.SECONDS));
}
private void onLeaderTransitioning(final LeaderTransitioning leaderTransitioning) {
- LOG.debug("{}: onLeaderTransitioning: {}", persistenceId(), leaderTransitioning);
+ LOG.debug("{}: onLeaderTransitioning: {}", getId(), leaderTransitioning);
final var roleChangeNotifier = roleChangeNotifier();
if (roleChangeNotifier != null && getRaftState() == RaftState.Follower
&& leaderTransitioning.getLeaderId().equals(getCurrentBehavior().getLeaderId())) {
switchBehavior(behaviorStateTracker.capture(getCurrentBehavior()),
RaftActorBehavior.createBehavior(context, newState));
}
- default -> LOG.warn("Switching to behavior : {} - not supported", newState);
+ default -> LOG.warn("{}: Switching to behavior : {} - not supported", getId(), newState);
}
}
}
}
private void handleApplyState(final ApplyState applyState) {
- long startTime = System.nanoTime();
+ final long startTime = System.nanoTime();
final var entry = applyState.getReplicatedLogEntry();
final var payload = entry.getData();
if (LOG.isDebugEnabled()) {
- LOG.debug("{}: Applying state for log index {} data {}",
- persistenceId(), entry.index(), payload);
+ LOG.debug("{}: Applying state for log index {} data {}", getId(), entry.index(), payload);
}
if (!(payload instanceof NoopPayload) && !(payload instanceof ServerConfigurationPayload)) {
applyState(applyState.getClientActor(), applyState.getIdentifier(), payload);
}
- long elapsedTime = System.nanoTime() - startTime;
+ final long elapsedTime = System.nanoTime() - startTime;
if (elapsedTime >= APPLY_STATE_DELAY_THRESHOLD_IN_NANOS) {
- LOG.debug("ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}",
- TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
+ LOG.debug("{}: ApplyState took more time than expected. Elapsed Time = {} ms ApplyState = {}", getId(),
+ TimeUnit.NANOSECONDS.toMillis(elapsedTime), applyState);
}
// Send the ApplyState message back to self to handle further processing asynchronously.
@Override
public long snapshotSequenceNr() {
// When we do a snapshot capture, we also capture and save the sequence-number of the persistent journal,
- // so that we can delete the persistent journal based on the saved sequence-number
- // However , when akka replays the journal during recovery, it replays it from the sequence number when the
- // snapshot was saved and not the number we saved. We would want to override it , by asking akka to use the
+ // so that we can delete the persistent journal based on the saved sequence-number.
+ // However, when Akka replays the journal during recovery, it replays it from the sequence number when the
+ // snapshot was saved and not the number we saved. We would want to override it, by asking Akka to use the
// last-sequence number known to us.
return context.getSnapshotManager().getLastSequenceNumber();
}
context.getReplicatedLog().lastIndex() + 1, context.currentTerm(), data);
replicatedLogEntry.setPersistencePending(true);
- LOG.debug("{}: Persist data {}", persistenceId(), replicatedLogEntry);
+ LOG.debug("{}: Persist data {}", getId(), replicatedLogEntry);
final RaftActorContext raftContext = getRaftActorContext();
String oldRaftPolicy = context.getConfigParams().getCustomRaftPolicyImplementationClass();
String newRaftPolicy = configParams.getCustomRaftPolicyImplementationClass();
- LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", persistenceId(),
+ LOG.debug("{}: RaftPolicy used with prev.config {}, RaftPolicy used with newConfig {}", getId(),
oldRaftPolicy, newRaftPolicy);
context.setConfigParams(configParams);
if (!Objects.equals(oldRaftPolicy, newRaftPolicy)) {
String previousLeaderId = behavior.getLeaderId();
short previousLeaderPayloadVersion = behavior.getLeaderPayloadVersion();
- LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", persistenceId(),
- previousLeaderId);
+ LOG.debug("{}: Re-initializing to Follower with previous leaderId {}", getId(), previousLeaderId);
changeCurrentBehavior(new Follower(context, previousLeaderId, previousLeaderPayloadVersion));
} else {
setPersistence(new PersistentDataProvider(this));
if (getCurrentBehavior() != null) {
- LOG.info("{}: Persistence has been enabled - capturing snapshot", persistenceId());
+ LOG.info("{}: Persistence has been enabled - capturing snapshot", getId());
captureSnapshot();
}
} else if (!persistent && (currentPersistence == null || currentPersistence.isRecoveryApplicable())) {
return null;
}
String peerAddress = context.getPeerAddress(leaderId);
- LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", persistenceId(), leaderId, peerAddress);
+ LOG.debug("{}: getLeaderAddress leaderId = {} peerAddress = {}", getId(), leaderId, peerAddress);
return peerAddress;
}
initiateLeadershipTransfer(new RaftActorLeadershipTransferCohort.OnComplete() {
@Override
public void onSuccess(final ActorRef raftActorRef) {
- LOG.debug("{}: leader transfer succeeded after change to non-voting", persistenceId());
+ LOG.debug("{}: leader transfer succeeded after change to non-voting", getId());
ensureFollowerState();
}
@Override
public void onFailure(final ActorRef raftActorRef) {
- LOG.debug("{}: leader transfer failed after change to non-voting", persistenceId());
+ LOG.debug("{}: leader transfer failed after change to non-voting", getId());
ensureFollowerState();
}
raftActor.pauseLeader(new TimedRunnable(context.getConfigParams().getElectionTimeOutInterval(), raftActor) {
@Override
protected void doRun() {
- LOG.debug("{}: pauseLeader successfully completed - doing transfer", raftActor.persistenceId());
+ LOG.debug("{}: pauseLeader successfully completed - doing transfer", raftActor.getId());
doTransfer();
}
@Override
protected void doCancel() {
- LOG.debug("{}: pauseLeader timed out - continuing with transfer", raftActor.persistenceId());
+ LOG.debug("{}: pauseLeader timed out - continuing with transfer", raftActor.getId());
doTransfer();
}
});
isTransferring = true;
leader.transferLeadership(this);
} else {
- LOG.debug("{}: No longer the leader - skipping transfer", raftActor.persistenceId());
+ LOG.debug("{}: No longer the leader - skipping transfer", raftActor.getId());
finish(true);
}
}
* This method is invoked to abort leadership transfer on failure.
*/
public void abortTransfer() {
- LOG.debug("{}: leader transfer aborted", raftActor.persistenceId());
+ LOG.debug("{}: leader transfer aborted", raftActor.getId());
finish(false);
}
* This method is invoked when leadership transfer was carried out and complete.
*/
public void transferComplete() {
- LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.persistenceId());
+ LOG.debug("{}: leader transfer complete - waiting for new leader", raftActor.getId());
// We'll give it a little time for the new leader to be elected to give the derived class a
// chance to possibly complete work that was suspended while we were transferring. The
FiniteDuration timeout = FiniteDuration.create(newLeaderTimeoutInMillis, TimeUnit.MILLISECONDS);
newLeaderTimer = raftActor.getContext().system().scheduler().scheduleOnce(timeout, raftActor.self(),
(Runnable) () -> {
- LOG.debug("{}: leader not elected in time", raftActor.persistenceId());
+ LOG.debug("{}: leader not elected in time", raftActor.getId());
finish(true);
}, raftActor.getContext().system().dispatcher(), raftActor.self());
}
void onNewLeader(final String newLeader) {
if (newLeader != null && newLeaderTimer != null) {
- LOG.debug("{}: leader changed to {}", raftActor.persistenceId(), newLeader);
+ LOG.debug("{}: leader changed to {}", raftActor.getId(), newLeader);
newLeaderTimer.cancel();
finish(true);
}
if (transferTimer.isRunning()) {
transferTimer.stop();
if (success) {
- LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.persistenceId(),
+ LOG.info("{}: Successfully transferred leadership to {} in {}", raftActor.getId(),
raftActor.getLeaderId(), transferTimer);
} else {
- LOG.warn("{}: Failed to transfer leadership in {}", raftActor.persistenceId(), transferTimer);
+ LOG.warn("{}: Failed to transfer leadership in {}", raftActor.getId(), transferTimer);
raftActor.unpauseLeader();
}
}
@Override
protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
actorDelegate.applyState(clientActor, identifier, data);
- LOG.info("{}: applyState called: {}", persistenceId(), data);
+ LOG.info("{}: applyState called: {}", getId(), data);
state.add(data);
}
@Override
public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
- LOG.info("{}: createSnapshot called", persistenceId());
+ LOG.info("{}: createSnapshot called", getId());
snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
}
@Override
public void applySnapshot(final Snapshot.State newState) {
- LOG.info("{}: applySnapshot called", persistenceId());
+ LOG.info("{}: applySnapshot called", getId());
applySnapshotState(newState);
snapshotCohortDelegate.applySnapshot(newState);
}
final DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(listenerActor, self());
- LOG.debug("{}: Registering listenerActor {} for path {}", persistenceId(), listenerActor, message.getPath());
+ LOG.debug("{}: Registering listenerActor {} for path {}", shardName(), listenerActor, message.getPath());
final ShardDataTree shardDataTree = getShard().getDataStore();
shardDataTree.registerTreeChangeListener(message.getPath(),
@Override
void onLeadershipChange(final boolean isLeader, final boolean hasLeader) {
- LOG.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader);
+ LOG.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", shardName(), isLeader, hasLeader);
- final EnableNotification msg = new EnableNotification(isLeader, persistenceId());
+ final EnableNotification msg = new EnableNotification(isLeader, shardName());
for (ActorSelection dataChangeListener : leaderOnlyListenerActors) {
dataChangeListener.tell(msg, self());
}
@Override
void onMessage(final RegisterDataTreeChangeListener message, final boolean isLeader, final boolean hasLeader) {
- LOG.debug("{}: onMessage {}, isLeader: {}, hasLeader: {}", persistenceId(), message, isLeader, hasLeader);
+ LOG.debug("{}: onMessage {}, isLeader: {}, hasLeader: {}", shardName(), message, isLeader, hasLeader);
final ActorRef registrationActor = createActor(DataTreeNotificationListenerRegistrationActor.props());
if (hasLeader && message.isRegisterOnAllInstances() || isLeader) {
doRegistration(message, registrationActor);
} else {
- LOG.debug("{}: Shard does not have a leader - delaying registration", persistenceId());
+ LOG.debug("{}: Shard does not have a leader - delaying registration", shardName());
final var delayedReg = new DelayedDataTreeChangeListenerRegistration(message, registrationActor);
final Collection<DelayedDataTreeChangeListenerRegistration> delayedRegList;
}
LOG.debug("{}: sending RegisterDataTreeNotificationListenerReply, listenerRegistrationPath = {} ",
- persistenceId(), registrationActor.path());
+ shardName(), registrationActor.path());
tellSender(new RegisterDataTreeNotificationListenerReply(registrationActor));
}
final ActorSelection listenerActor = selectActor(message.getListenerActorPath());
// We have a leader so enable the listener.
- listenerActor.tell(new EnableNotification(true, persistenceId()), self());
+ listenerActor.tell(new EnableNotification(true, shardName()), self());
if (!message.isRegisterOnAllInstances()) {
// This is a leader-only registration so store a reference to the listener actor so it can be notified
final var singleHistoryMeta = currentHistories.get(new LocalHistoryIdentifier(clientId, 0));
if (singleHistoryMeta == null) {
final var tree = shard.getDataStore();
- singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), clientId, tree);
+ singleHistory = StandaloneFrontendHistory.create(shard.getId(), clientId, tree);
} else {
singleHistory = singleHistoryMeta.toLeaderState(shard);
}
- return new LeaderFrontendState.Enabled(shard.persistenceId(), clientId, shard.getDataStore(),
+ return new LeaderFrontendState.Enabled(shard.getId(), clientId, shard.getDataStore(),
purgedHistories.mutableCopy(), singleHistory, histories);
}
*/
@NonNull AbstractFrontendHistory toLeaderState(final @NonNull Shard shard) {
if (identifier.getHistoryId() == 0) {
- return StandaloneFrontendHistory.recreate(shard.persistenceId(), identifier.getClientId(),
+ return StandaloneFrontendHistory.recreate(shard.getId(), identifier.getClientId(),
shard.getDataStore(), closedTransactions, purgedTransactions);
}
- return LocalFrontendHistory.recreate(shard.persistenceId(), shard.getDataStore(),
+ return LocalFrontendHistory.recreate(shard.getId(), shard.getDataStore(),
shard.getDataStore().recreateTransactionChain(identifier, closed), closedTransactions, purgedTransactions);
}
}
return shard;
}
- protected final String persistenceId() {
- return shard.persistenceId();
+ protected final String shardName() {
+ return shard.getId();
}
protected final void tellSender(final Object message) {
@Override
public final void postStop() throws Exception {
- LOG.info("Stopping Shard {}", persistenceId());
+ LOG.info("Stopping Shard {}", getId());
super.postStop();
@Override
protected final void handleRecover(final Object message) {
- LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(),
- getSender());
+ LOG.debug("{}: onReceiveRecover: Received message {} from {}", getId(), message.getClass(), getSender());
super.handleRecover(message);
try (var context = appendEntriesReplyTracker.received(message)) {
final var maybeError = context.error();
if (maybeError.isPresent()) {
- LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
+ LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", getId(),
maybeError.orElseThrow());
}
}
}
} catch (RequestException e) {
- LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
+ LOG.debug("{}: request {} failed", getId(), envelope, e);
envelope.sendFailure(e, ticker().read() - now);
} catch (Exception e) {
- LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
- envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
- ticker().read() - now);
+ LOG.debug("{}: request {} caused failure", getId(), envelope, e);
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e), ticker().read() - now);
}
}
}
private void onMakeLeaderLocal() {
- LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
+ LOG.debug("{}: onMakeLeaderLocal received", getId());
if (isLeader()) {
getSender().tell(new Status.Success(null), self());
return;
// request. We can also let the caller retry by sending a flag
// in the response indicating the request is "reTryable".
getSender().tell(new Failure(
- new LeadershipTransferFailedException("We cannot initiate leadership transfer to local node. "
- + "Currently there is no leader for " + persistenceId())),
- self());
+ new LeadershipTransferFailedException(
+ "We cannot initiate leadership transfer to local node. "
+ + "Currently there is no leader for " + getId())),
+ self());
return;
}
return existing;
}
if (cmp > 0) {
- LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId);
+ LOG.debug("{}: rejecting request from outdated client {}", getId(), clientId);
throw new RetiredGenerationException(clientId.getGeneration(),
existing.getIdentifier().getGeneration());
}
- LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId);
+ LOG.info("{}: retiring state {}, outdated by request from client {}", getId(), existing, clientId);
existing.retire();
knownFrontends.remove(clientId.getFrontendId());
} else {
- LOG.debug("{}: client {} is not yet known", persistenceId(), clientId);
+ LOG.debug("{}: client {} is not yet known", getId(), clientId);
}
return null;
if (!isLeader() || !isLeaderActive()) {
LOG.info("{}: not currently leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
- + "isLeadershipTransferInProgress: {}.",
- persistenceId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
+ + "isLeadershipTransferInProgress: {}.",
+ getId(), message, isLeader(), isLeaderActive(), isLeadershipTransferInProgress());
throw new NotLeaderException(self());
}
final ABIVersion selectedVersion = selectVersion(message);
final LeaderFrontendState frontend;
if (existing == null) {
- frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store);
+ frontend = new LeaderFrontendState.Enabled(getId(), clientId, store);
knownFrontends.put(clientId.getFrontendId(), frontend);
- LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
+ LOG.debug("{}: created state {} for client {}", getId(), frontend, clientId);
} else {
frontend = existing;
}
// We are not the leader, hence we want to fail-fast.
if (!isLeader() || paused || !isLeaderActive()) {
LOG.debug("{}: not currently active leader, rejecting request {}. isLeader: {}, isLeaderActive: {},"
- + "isLeadershipTransferInProgress: {}, paused: {}",
- persistenceId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused);
+ + "isLeadershipTransferInProgress: {}, paused: {}",
+ getId(), envelope, isLeader(), isLeaderActive(), isLeadershipTransferInProgress(), paused);
throw new NotLeaderException(self());
}
case LocalHistoryRequest<?> req -> getFrontend(req.getTarget().getClientId())
.handleLocalHistoryRequest(req, envelope, now);
default -> {
- LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
+ LOG.warn("{}: rejecting unsupported request {}", getId(), request);
throw new UnsupportedRequestException(request);
}
};
@Override
protected final RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
if (restoreFromSnapshot == null) {
- return ShardRecoveryCoordinator.create(store, persistenceId(), LOG);
+ return ShardRecoveryCoordinator.create(store, getId(), LOG);
}
- return ShardRecoveryCoordinator.forSnapshot(store, persistenceId(), LOG, restoreFromSnapshot.getSnapshot());
+ return ShardRecoveryCoordinator.forSnapshot(store, getId(), LOG, restoreFromSnapshot.getSnapshot());
}
@Override
protected final void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
if (data instanceof Payload payload) {
if (payload instanceof DisableTrackingPayload disableTracking) {
- LOG.debug("{}: ignoring legacy {}", persistenceId(), disableTracking);
+ LOG.debug("{}: ignoring legacy {}", getId(), disableTracking);
return;
}
try {
store.applyReplicatedPayload(identifier, payload);
} catch (DataValidationFailedException | IOException e) {
- LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
+ LOG.error("{}: Error applying replica {}", getId(), identifier, e);
}
} else {
- LOG.error("{}: Unknown state for {} received {}", persistenceId(), identifier, data);
+ LOG.error("{}: Unknown state for {} received {}", getId(), identifier, data);
}
}
if (!isLeader) {
if (LOG.isDebugEnabled()) {
LOG.debug(
- "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader",
- persistenceId(), getId());
+ "{}: onStateChanged: Closing all transaction chains because shard is no longer the leader",
+ getId());
}
paused = false;
if (!isLeader()) {
if (!knownFrontends.isEmpty()) {
- LOG.debug("{}: removing frontend state for {}", persistenceId(), knownFrontends.keySet());
+ LOG.debug("{}: removing frontend state for {}", getId(), knownFrontends.keySet());
knownFrontends = ImmutableMap.of();
}
} else {
// We have become the leader, we need to reconstruct frontend state
knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this));
- LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
+ LOG.debug("{}: became leader with frontend state for {}", getId(), knownFrontends.keySet());
}
}
@Override
protected final void pauseLeader(final Runnable operation) {
- LOG.debug("{}: In pauseLeader, operation: {}", persistenceId(), operation);
+ LOG.debug("{}: In pauseLeader, operation: {}", getId(), operation);
paused = true;
// Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
@Override
protected final void unpauseLeader() {
- LOG.debug("{}: In unpauseLeader", persistenceId());
+ LOG.debug("{}: In unpauseLeader", getId());
paused = false;
store.setRunOnPendingTransactionsComplete(null);