builder);
}
+ private String logName() {
+ return persistenceId();
+ }
+
@Override
@Deprecated(since = "11.0.0", forRemoval = true)
public final ActorRef getSender() {
@Override
public void preStart() {
- LOG.info("Starting ShardManager {}", persistenceId());
+ LOG.info("Starting ShardManager {}", logName());
}
@Override
public void postStop() {
- LOG.info("Stopping ShardManager {}", persistenceId());
+ LOG.info("Stopping ShardManager {}", logName());
shardManagerMBean.unregisterMBean();
}
} else if (message instanceof SaveSnapshotSuccess msg) {
onSaveSnapshotSuccess(msg);
} else if (message instanceof SaveSnapshotFailure msg) {
- LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), msg.cause());
+ LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", logName(), msg.cause());
} else if (message instanceof Shutdown) {
onShutDown();
} else if (message instanceof GetLocalShardIds) {
} else if (message instanceof RegisterForShardAvailabilityChanges msg) {
onRegisterForShardAvailabilityChanges(msg);
} else if (message instanceof DeleteSnapshotsFailure msg) {
- LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), msg.cause());
+ LOG.warn("{}: Failed to delete prior snapshots", logName(), msg.cause());
} else if (message instanceof DeleteSnapshotsSuccess) {
- LOG.debug("{}: Successfully deleted prior snapshots", persistenceId());
+ LOG.debug("{}: Successfully deleted prior snapshots", logName());
} else if (message instanceof RegisterRoleChangeListenerReply) {
- LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
+ LOG.trace("{}: Received RegisterRoleChangeListenerReply", logName());
} else if (message instanceof ClusterEvent.MemberEvent msg) {
- LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), msg);
+ LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", logName(), msg);
} else {
unknownMessage(message);
}
}
private void onRegisterForShardAvailabilityChanges(final RegisterForShardAvailabilityChanges message) {
- LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", persistenceId(), message);
+ LOG.debug("{}: onRegisterForShardAvailabilityChanges: {}", logName(), message);
final Consumer<String> callback = message.getCallback();
shardAvailabilityCallbacks.add(callback);
}
private void onGetShardRole(final GetShardRole message) {
- LOG.debug("{}: onGetShardRole for shard: {}", persistenceId(), message.getName());
+ LOG.debug("{}: onGetShardRole for shard: {}", logName(), message.getName());
final String name = message.getName();
final ShardInformation shardInformation = localShards.get(name);
if (shardInformation == null) {
- LOG.info("{}: no shard information for {} found", persistenceId(), name);
+ LOG.info("{}: no shard information for {} found", logName(), name);
getSender().tell(new Status.Failure(
new IllegalArgumentException("Shard with name " + name + " not present.")), ActorRef.noSender());
return;
final var stopFutures = new ArrayList<Future<Boolean>>(localShards.size());
for (var info : localShards.values()) {
if (info.getActor() != null) {
- LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
+ LOG.debug("{}: Issuing gracefulStop to shard {}", logName(), info.getShardId());
stopFutures.add(Patterns.gracefulStop(info.getActor(), info.getDatastoreContext().getShardRaftConfig()
.getElectionTimeOutInterval().$times(2), Shutdown.INSTANCE));
}
}
- LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
+ LOG.info("Shutting down ShardManager {} - waiting on {} shards", logName(), stopFutures.size());
final var dispatcher = new Dispatchers(context().system().dispatchers())
.getDispatcher(Dispatchers.DispatcherType.Client);
Futures.sequence(stopFutures, dispatcher).onComplete(new OnComplete<Iterable<Boolean>>() {
@Override
public void onComplete(final Throwable failure, final Iterable<Boolean> results) {
- LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
+ LOG.debug("{}: All shards shutdown - sending PoisonPill to self", logName());
self().tell(PoisonPill.getInstance(), self());
if (failure != null) {
- LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
+ LOG.warn("{}: An error occurred attempting to shut down the shards", logName(), failure);
return;
}
}
if (nfailed > 0) {
- LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
+ LOG.warn("{}: {} shards did not shut down gracefully", logName(), nfailed);
}
}
}, dispatcher);
final RemoveServerReply replyMsg, final String leaderPath) {
shardReplicaOperationsInProgress.remove(shardId.getShardName());
- LOG.debug("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
+ LOG.debug("{}: Received {} for shard {}", logName(), replyMsg, shardId.getShardName());
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
- LOG.debug("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
+ LOG.debug("{}: Leader shard successfully removed the replica shard {}", logName(),
shardId.getShardName());
originalSender.tell(new Status.Success(null), self());
} else {
LOG.warn("{}: Leader failed to remove shard replica {} with status {}",
- persistenceId(), shardId, replyMsg.getStatus());
+ logName(), shardId, replyMsg.getStatus());
Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(), leaderPath, shardId);
originalSender.tell(new Status.Failure(failure), self());
final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
//inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
- LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
+ LOG.debug("{}: Sending RemoveServer message to peer {} for shard {}", logName(),
primaryPath, shardId);
Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration());
public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
shardReplicaOperationsInProgress.remove(shardName);
- LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", persistenceId(), primaryPath,
+ LOG.debug("{}: RemoveServer request to leader {} for shard {} failed", logName(), primaryPath,
shardName, failure);
// FAILURE
final String shardName = shardId.getShardName();
final ShardInformation shardInformation = localShards.remove(shardName);
if (shardInformation == null) {
- LOG.debug("{} : Shard replica {} is not present in list", persistenceId(), shardId.toString());
+ LOG.debug("{} : Shard replica {} is not present in list", logName(), shardId.toString());
return;
}
long timeoutInMS = Math.max(shardInformation.getDatastoreContext().getShardRaftConfig()
.getElectionTimeOutInterval().$times(3).toMillis(), 10000);
- LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", persistenceId(), shardActor,
+ LOG.debug("{} : Sending Shutdown to Shard actor {} with {} ms timeout", logName(), shardActor,
timeoutInMS);
final var stopFuture = Patterns.gracefulStop(shardActor,
@Override
public void onComplete(final Throwable failure, final Boolean result) {
if (failure == null) {
- LOG.debug("{} : Successfully shut down Shard actor {}", persistenceId(), shardActor);
+ LOG.debug("{} : Successfully shut down Shard actor {}", logName(), shardActor);
} else {
- LOG.warn("{}: Failed to shut down Shard actor {}", persistenceId(), shardActor, failure);
+ LOG.warn("{}: Failed to shut down Shard actor {}", logName(), shardActor, failure);
}
self().tell((RunnableMessage) () -> {
.getDispatcher(Dispatchers.DispatcherType.Client));
}
- LOG.debug("{} : Local Shard replica for shard {} has been removed", persistenceId(), shardName);
+ LOG.debug("{} : Local Shard replica for shard {} has been removed", logName(), shardName);
persistShardList();
}
private void onGetSnapshot(final GetSnapshot getSnapshot) {
- LOG.debug("{}: onGetSnapshot", persistenceId());
+ LOG.debug("{}: onGetSnapshot", logName());
List<String> notInitialized = null;
for (var shardInfo : localShards.values()) {
}
ActorRef replyActor = getContext().actorOf(ShardManagerGetSnapshotReplyActor.props(
- new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), persistenceId(),
+ new ArrayList<>(localShards.keySet()), type, currentSnapshot , getSender(), logName(),
datastoreContextFactory.getBaseDatastoreContext().getShardInitializationTimeout().duration()));
for (var shardInfo : localShards.values()) {
@SuppressWarnings("checkstyle:IllegalCatch")
private void onCreateShard(final CreateShard createShard) {
- LOG.debug("{}: onCreateShard: {}", persistenceId(), createShard);
+ LOG.debug("{}: onCreateShard: {}", logName(), createShard);
Object reply;
try {
String shardName = createShard.getModuleShardConfig().getShardName();
if (localShards.containsKey(shardName)) {
- LOG.debug("{}: Shard {} already exists", persistenceId(), shardName);
+ LOG.debug("{}: Shard {} already exists", logName(), shardName);
reply = new Status.Success(String.format("Shard with name %s already exists", shardName));
} else {
doCreateShard(createShard);
reply = new Status.Success(null);
}
} catch (Exception e) {
- LOG.error("{}: onCreateShard failed", persistenceId(), e);
+ LOG.error("{}: onCreateShard failed", logName(), e);
reply = new Status.Failure(e);
}
return false;
}
- LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", persistenceId(),
+ LOG.debug("{} : Stop is in progress for shard {} - adding OnComplete callback to defer {}", logName(),
shardName, messageToDefer);
final ActorRef sender = getSender();
stopOnComplete.addOnComplete(new OnComplete<Boolean>() {
@Override
public void onComplete(final Throwable failure, final Boolean result) {
- LOG.debug("{} : Stop complete for shard {} - re-queing {}", persistenceId(), shardName, messageToDefer);
+ LOG.debug("{} : Stop complete for shard {} - re-queing {}", logName(), shardName, messageToDefer);
self().tell(messageToDefer, sender);
}
});
}
LOG.debug("{} doCreateShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
- persistenceId(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses,
- isActiveMember);
+ logName(), shardId, moduleShardConfig.getShardMemberNames(), peerAddresses, isActiveMember);
ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
shardDatastoreContext, createShard.getShardBuilder(), peerAddressResolver);
private void checkReady() {
if (isReadyWithLeaderId()) {
- LOG.info("{}: All Shards are ready - data store {} is ready", persistenceId(), type);
+ LOG.info("{}: All Shards are ready - data store {} is ready", logName(), type);
readinessFuture.set(Empty.value());
}
}
private void onLeaderStateChanged(final ShardLeaderStateChanged leaderStateChanged) {
- LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
+ LOG.info("{}: Received LeaderStateChanged message: {}", logName(), leaderStateChanged);
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
if (shardInformation != null) {
private void onShardNotInitializedTimeout(final ShardNotInitializedTimeout message) {
ShardInformation shardInfo = message.getShardInfo();
- LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
+ LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", logName(),
shardInfo.getShardName());
shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
if (!shardInfo.isShardInitialized()) {
- LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
+ LOG.debug("{}: Returning NotInitializedException for shard {}", logName(), shardInfo.getShardName());
message.getSender().tell(createNotInitializedException(shardInfo.getShardId()), self());
} else {
- LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
+ LOG.debug("{}: Returning NoShardLeaderException for shard {}", logName(), shardInfo.getShardName());
message.getSender().tell(new NoShardLeaderException(shardInfo.getShardId()), self());
}
}
private void onFollowerInitialSyncStatus(final FollowerInitialSyncUpStatus status) {
- LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
+ LOG.info("{} Received follower initial sync status for {} status sync done {}", logName(),
status.getName(), status.isInitialSyncDone());
ShardInformation shardInformation = findShardInformation(status.getName());
}
private void onRoleChangeNotification(final RoleChangeNotification roleChanged) {
- LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
+ LOG.info("{}: Received role changed for {} from {} to {}", logName(), roleChanged.getMemberId(),
roleChanged.getOldRole(), roleChanged.getNewRole());
ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
try {
shardId = ShardIdentifier.fromShardIdString(actorName);
} catch (IllegalArgumentException e) {
- LOG.debug("{}: ignoring actor {}", persistenceId(), actorName, e);
+ LOG.debug("{}: ignoring actor {}", logName(), actorName, e);
return;
}
}
private void markShardAsInitialized(final String shardName) {
- LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
+ LOG.debug("{}: Initializing shard [{}]", logName(), shardName);
ShardInformation shardInformation = localShards.get(shardName);
if (shardInformation != null) {
@SuppressWarnings("checkstyle:IllegalCatch")
private void onRecoveryCompleted() {
- LOG.info("Recovery complete : {}", persistenceId());
+ LOG.info("Recovery complete : {}", logName());
if (currentSnapshot == null && restoreFromSnapshot != null
&& restoreFromSnapshot.getShardManagerSnapshot() != null) {
ShardManagerSnapshot snapshot = restoreFromSnapshot.getShardManagerSnapshot();
- LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", persistenceId(), snapshot);
+ LOG.debug("{}: Restoring from ShardManagerSnapshot: {}", logName(), snapshot);
applyShardManagerSnapshot(snapshot);
}
.getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
}
- LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", persistenceId(), timeout.toMillis(),
+ LOG.debug("{}: Scheduling {} ms timer to wait for shard {}", logName(), timeout.toMillis(),
shardInformation);
Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
onShardInitialized.setTimeoutSchedule(timeoutSchedule);
} else if (!shardInformation.isShardInitialized()) {
- LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
+ LOG.debug("{}: Returning NotInitializedException for shard {}", logName(),
shardInformation.getShardName());
getSender().tell(createNotInitializedException(shardInformation.getShardId()), self());
} else {
- LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
+ LOG.debug("{}: Returning NoShardLeaderException for shard {}", logName(),
shardInformation.getShardName());
getSender().tell(new NoShardLeaderException(shardInformation.getShardId()), self());
}
private void memberRemoved(final ClusterEvent.MemberRemoved message) {
MemberName memberName = memberToName(message.member());
- LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", logName(), memberName,
message.member().address());
peerAddressResolver.removePeerAddress(memberName);
private void memberExited(final ClusterEvent.MemberExited message) {
MemberName memberName = memberToName(message.member());
- LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberExited: memberName: {}, address: {}", logName(), memberName,
message.member().address());
peerAddressResolver.removePeerAddress(memberName);
private void memberUp(final ClusterEvent.MemberUp message) {
MemberName memberName = memberToName(message.member());
- LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberUp: memberName: {}, address: {}", logName(), memberName,
message.member().address());
memberUp(memberName, message.member().address());
private void memberWeaklyUp(final MemberWeaklyUp message) {
MemberName memberName = memberToName(message.member());
- LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
+ LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", logName(), memberName,
message.member().address());
memberUp(memberName, message.member().address());
String peerAddress = peerAddressResolver.getShardActorAddress(shardName, memberName);
info.updatePeerAddress(peerId, peerAddress, self());
LOG.debug("{}: updated peer {} on member {} with address {} on shard {} whose actor address is {}",
- persistenceId(), peerId, memberName, peerAddress, info.getShardId(), info.getActor());
+ logName(), peerId, memberName, peerAddress, info.getShardId(), info.getActor());
}
} else {
info.getActor().tell(message, self());
}
private void findPrimary(final FindPrimary message) {
- LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
+ LOG.debug("{}: In findPrimary: {}", logName(), message);
final String shardName = message.getShardName();
final boolean canReturnLocalShardState = !(message instanceof RemoteFindPrimary);
? new LocalPrimaryShardFound(primaryPath, info.getLocalShardDataTree().orElseThrow()) :
new RemotePrimaryShardFound(primaryPath, info.getLeaderVersion());
- LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
+ LOG.debug("{}: Found primary for {}: {}", logName(), shardName, found);
return found;
});
}
LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}, visitedAddresses: {}",
- persistenceId(), shardName, address, visitedAddresses);
+ logName(), shardName, address, visitedAddresses);
getContext().actorSelection(address).forward(new RemoteFindPrimary(shardName,
message.isWaitUntilReady(), visitedAddresses), getContext());
return;
}
- LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
+ LOG.debug("{}: No shard found for {}", logName(), shardName);
getSender().tell(new PrimaryNotFoundException(
String.format("No primary shard found for %s.", shardName)), self());
for (String shardName : memberShardNames) {
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
- LOG.debug("{}: Creating local shard: {}", persistenceId(), shardId);
+ LOG.debug("{}: Creating local shard: {}", logName(), shardId);
Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShards.put(shardName, createShardInfoFor(shardName, shardId, peerAddresses,
private boolean isShardReplicaOperationInProgress(final String shardName, final ActorRef sender) {
if (shardReplicaOperationsInProgress.contains(shardName)) {
- LOG.debug("{}: A shard replica operation for {} is already in progress", persistenceId(), shardName);
+ LOG.debug("{}: A shard replica operation for {} is already in progress", logName(), shardName);
sender.tell(new Status.Failure(new IllegalStateException(
String.format("A shard replica operation for %s is already in progress", shardName))), self());
return true;
private void onAddShardReplica(final AddShardReplica shardReplicaMsg) {
final String shardName = shardReplicaMsg.getShardName();
- LOG.debug("{}: onAddShardReplica: {}", persistenceId(), shardReplicaMsg);
+ LOG.debug("{}: onAddShardReplica: {}", logName(), shardReplicaMsg);
// verify the shard with the specified name is present in the cluster configuration
if (!configuration.isShardConfigured(shardName)) {
- LOG.debug("{}: No module configuration exists for shard {}", persistenceId(), shardName);
+ LOG.debug("{}: No module configuration exists for shard {}", logName(), shardName);
getSender().tell(new Status.Failure(new IllegalArgumentException(
"No module configuration exists for shard " + shardName)), self());
return;
// Create the localShard
if (modelContext == null) {
LOG.debug("{}: No SchemaContext is available in order to create a local shard instance for {}",
- persistenceId(), shardName);
+ logName(), shardName);
getSender().tell(new Status.Failure(new IllegalStateException(
"No SchemaContext is available in order to create a local shard instance for " + shardName)), self());
return;
}
- findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, persistenceId(),
+ findPrimary(shardName, new AutoFindPrimaryFailureResponseHandler(getSender(), shardName, logName(),
self()) {
@Override
public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
- final RunnableMessage runnable = (RunnableMessage) () ->
- addShard(getShardName(), response, getSender());
+ final RunnableMessage runnable = () -> addShard(getShardName(), response, getSender());
if (!isPreviousShardActorStopInProgress(getShardName(), runnable)) {
self().tell(runnable, getTargetActor());
}
}
private void sendLocalReplicaAlreadyExistsReply(final String shardName, final ActorRef sender) {
- LOG.debug("{}: Local shard {} already exists", persistenceId(), shardName);
+ LOG.debug("{}: Local shard {} already exists", logName(), shardName);
sender.tell(new Status.Failure(new AlreadyExistsException(
String.format("Local shard %s already exists", shardName))), self());
}
peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
//inform ShardLeader to add this shard as a replica by sending an AddServer message
- LOG.debug("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+ LOG.debug("{}: Sending AddServer message to peer {} for shard {}", logName(),
response.getPrimaryPath(), shardInfo.getShardId());
final Timeout addServerTimeout = new Timeout(shardInfo.getDatastoreContext()
@Override
public void onComplete(final Throwable failure, final Object addServerResponse) {
if (failure != null) {
- LOG.debug("{}: AddServer request to {} for {} failed", persistenceId(),
+ LOG.debug("{}: AddServer request to {} for {} failed", logName(),
response.getPrimaryPath(), shardName, failure);
final String msg = String.format("AddServer request to leader %s for shard %s failed",
String shardName = shardInfo.getShardName();
shardReplicaOperationsInProgress.remove(shardName);
- LOG.debug("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+ LOG.debug("{}: Received {} for shard {} from leader {}", logName(), replyMsg, shardName, leaderPath);
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
- LOG.debug("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+ LOG.debug("{}: Leader shard successfully added the replica shard {}", logName(), shardName);
// Make the local shard voting capable
shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), self());
sendLocalReplicaAlreadyExistsReply(shardName, sender);
} else {
LOG.warn("{}: Leader failed to add shard replica {} with status {}",
- persistenceId(), shardName, replyMsg.getStatus());
+ logName(), shardName, replyMsg.getStatus());
Exception failure = getServerChangeException(AddServer.class, replyMsg.getStatus(), leaderPath,
shardInfo.getShardId());
}
private void onRemoveShardReplica(final RemoveShardReplica shardReplicaMsg) {
- LOG.debug("{}: onRemoveShardReplica: {}", persistenceId(), shardReplicaMsg);
+ LOG.debug("{}: onRemoveShardReplica: {}", logName(), shardReplicaMsg);
findPrimary(shardReplicaMsg.getShardName(), new AutoFindPrimaryFailureResponseHandler(getSender(),
- shardReplicaMsg.getShardName(), persistenceId(), self()) {
+ shardReplicaMsg.getShardName(), logName(), self()) {
@Override
public void onRemotePrimaryShardFound(final RemotePrimaryShardFound response) {
doRemoveShardReplicaAsync(response.getPrimaryPath());
shardList.remove(shardInfo.getShardName());
}
}
- LOG.debug("{}: persisting the shard list {}", persistenceId(), shardList);
+ LOG.debug("{}: persisting the shard list {}", logName(), shardList);
saveSnapshot(updateShardManagerSnapshot(shardList));
}
private void applyShardManagerSnapshot(final ShardManagerSnapshot snapshot) {
currentSnapshot = snapshot;
- LOG.debug("{}: onSnapshotOffer: {}", persistenceId(), currentSnapshot);
+ LOG.debug("{}: onSnapshotOffer: {}", logName(), currentSnapshot);
final MemberName currentMember = cluster.getCurrentMemberName();
Set<String> configuredShardList =
for (String shard : currentSnapshot.getShardList()) {
if (!configuredShardList.contains(shard)) {
// add the current member as a replica for the shard
- LOG.debug("{}: adding shard {}", persistenceId(), shard);
+ LOG.debug("{}: adding shard {}", logName(), shard);
configuration.addMemberReplicaForShard(shard, currentMember);
} else {
configuredShardList.remove(shard);
}
for (String shard : configuredShardList) {
// remove the member as a replica for the shard
- LOG.debug("{}: removing shard {}", persistenceId(), shard);
+ LOG.debug("{}: removing shard {}", logName(), shard);
configuration.removeMemberReplicaForShard(shard, currentMember);
}
}
private void onSaveSnapshotSuccess(final SaveSnapshotSuccess successMessage) {
- LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
- persistenceId());
+ LOG.debug("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available", logName());
deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), successMessage.metadata().timestamp() - 1,
0, 0));
}
private void onChangeShardServersVotingStatus(final ChangeShardMembersVotingStatus changeMembersVotingStatus) {
- LOG.debug("{}: onChangeShardServersVotingStatus: {}", persistenceId(), changeMembersVotingStatus);
+ LOG.debug("{}: onChangeShardServersVotingStatus: {}", logName(), changeMembersVotingStatus);
String shardName = changeMembersVotingStatus.getShardName();
Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
}
private void onFlipShardMembersVotingStatus(final FlipShardMembersVotingStatus flipMembersVotingStatus) {
- LOG.debug("{}: onFlipShardMembersVotingStatus: {}", persistenceId(), flipMembersVotingStatus);
+ LOG.debug("{}: onFlipShardMembersVotingStatus: {}", logName(), flipMembersVotingStatus);
ActorRef sender = getSender();
final String shardName = flipMembersVotingStatus.getShardName();
}
private void findLocalShard(final FindLocalShard message) {
- LOG.debug("{}: findLocalShard : {}", persistenceId(), message.getShardName());
+ LOG.debug("{}: findLocalShard : {}", logName(), message.getShardName());
final ShardInformation shardInformation = localShards.get(message.getShardName());
if (shardInformation == null) {
LOG.debug("{}: Local shard {} not found - shards present: {}",
- persistenceId(), message.getShardName(), localShards.keySet());
+ logName(), message.getShardName(), localShards.keySet());
getSender().tell(new LocalShardNotFound(message.getShardName()), self());
return;
@Override
public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
- LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId(), shardName,
+ LOG.debug("{}: Received failure from FindLocalShard for shard {}", logName(), shardName,
failure);
sender.tell(new Status.Failure(new RuntimeException(
String.format("Failed to find local shard %s", shardName), failure)), self());
} if (response instanceof LocalShardFound msg) {
self().tell((RunnableMessage) () -> onLocalShardFound.accept(msg), sender);
} else if (response instanceof LocalShardNotFound) {
- LOG.debug("{}: Local shard {} does not exist", persistenceId(), shardName);
+ LOG.debug("{}: Local shard {} does not exist", logName(), shardName);
sender.tell(new Status.Failure(new IllegalArgumentException(
String.format("Local shard %s does not exist", shardName))), self());
} else {
- LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId(), shardName,
+ LOG.debug("{}: Failed to find local shard {}: received response: {}", logName(), shardName,
response);
sender.tell(new Status.Failure(response instanceof Throwable throwable ? throwable
: new RuntimeException(String.format("Failed to find local shard %s: received response: %s",
shardReplicaOperationsInProgress.add(shardName);
DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
- final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+ final var shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
- LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", persistenceId(),
- changeServersVotingStatus, shardActorRef.path());
+ LOG.debug("{}: Sending ChangeServersVotingStatus message {} to local shard {}", logName(),
+ changeServersVotingStatus, shardActorRef.path());
- Timeout timeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2));
- Patterns.ask(shardActorRef, changeServersVotingStatus, timeout).onComplete(new OnComplete<>() {
- @Override
- public void onComplete(final Throwable failure, final Object response) {
- shardReplicaOperationsInProgress.remove(shardName);
- if (failure != null) {
- LOG.debug("{}: ChangeServersVotingStatus request to local shard {} failed", persistenceId(),
- shardActorRef.path(), failure);
- sender.tell(new Status.Failure(new RuntimeException(
- String.format("ChangeServersVotingStatus request to local shard %s failed",
- shardActorRef.path()), failure)), self());
- return;
- }
+ Patterns.ask(shardActorRef, changeServersVotingStatus,
+ new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(2)))
+ .onComplete(new OnComplete<>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object response) {
+ shardReplicaOperationsInProgress.remove(shardName);
+ if (failure != null) {
+ LOG.debug("{}: ChangeServersVotingStatus request to local shard {} failed", logName(),
+ shardActorRef.path(), failure);
+ sender.tell(new Status.Failure(new RuntimeException(
+ String.format("ChangeServersVotingStatus request to local shard %s failed",
+ shardActorRef.path()), failure)), self());
+ return;
+ }
- LOG.debug("{}: Received {} from local shard {}", persistenceId(), response, shardActorRef.path());
+ LOG.debug("{}: Received {} from local shard {}", logName(), response, shardActorRef.path());
- final var replyMsg = (ServerChangeReply) response;
- if (replyMsg.getStatus() == ServerChangeStatus.OK) {
- LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", persistenceId(), shardName);
- sender.tell(new Status.Success(null), self());
- } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
- sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
- "The requested voting state change for shard %s is invalid. At least one member "
- + "must be voting", shardId.getShardName()))), self());
- } else {
- LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
- persistenceId(), shardName, replyMsg.getStatus());
+ final var replyMsg = (ServerChangeReply) response;
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug("{}: ChangeServersVotingStatus succeeded for shard {}", logName(), shardName);
+ sender.tell(new Status.Success(null), self());
+ } else if (replyMsg.getStatus() == ServerChangeStatus.INVALID_REQUEST) {
+ sender.tell(new Status.Failure(new IllegalArgumentException(String.format(
+ "The requested voting state change for shard %s is invalid. At least one member "
+ + "must be voting", shardId.getShardName()))), self());
+ } else {
+ LOG.warn("{}: ChangeServersVotingStatus failed for shard {} with status {}",
+ logName(), shardName, replyMsg.getStatus());
- Exception error = getServerChangeException(ChangeServersVotingStatus.class,
- replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
- sender.tell(new Status.Failure(error), self());
+ Exception error = getServerChangeException(ChangeServersVotingStatus.class,
+ replyMsg.getStatus(), shardActorRef.path().toString(), shardId);
+ sender.tell(new Status.Failure(error), self());
+ }
}
- }
- }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
}
private static final class ForwardedAddServerReply {