@Override
public void handleCommand(final Object message) throws Exception {
- if (message instanceof FindPrimary) {
- findPrimary((FindPrimary)message);
- } else if (message instanceof FindLocalShard) {
- findLocalShard((FindLocalShard) message);
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext(message);
- } else if (message instanceof ActorInitialized) {
- onActorInitialized(message);
- } else if (message instanceof ClusterEvent.MemberUp) {
- memberUp((ClusterEvent.MemberUp) message);
- } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
- memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
- } else if (message instanceof ClusterEvent.MemberExited) {
- memberExited((ClusterEvent.MemberExited) message);
- } else if (message instanceof ClusterEvent.MemberRemoved) {
- memberRemoved((ClusterEvent.MemberRemoved) message);
- } else if (message instanceof ClusterEvent.UnreachableMember) {
- memberUnreachable((ClusterEvent.UnreachableMember) message);
- } else if (message instanceof ClusterEvent.ReachableMember) {
- memberReachable((ClusterEvent.ReachableMember) message);
- } else if (message instanceof DatastoreContextFactory) {
- onDatastoreContextFactory((DatastoreContextFactory) message);
- } else if (message instanceof RoleChangeNotification) {
- onRoleChangeNotification((RoleChangeNotification) message);
- } else if (message instanceof FollowerInitialSyncUpStatus) {
- onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
- } else if (message instanceof ShardNotInitializedTimeout) {
- onShardNotInitializedTimeout((ShardNotInitializedTimeout) message);
- } else if (message instanceof ShardLeaderStateChanged) {
- onLeaderStateChanged((ShardLeaderStateChanged) message);
- } else if (message instanceof SwitchShardBehavior) {
- onSwitchShardBehavior((SwitchShardBehavior) message);
- } else if (message instanceof CreateShard) {
- onCreateShard((CreateShard)message);
- } else if (message instanceof AddShardReplica) {
- onAddShardReplica((AddShardReplica) message);
+ if (message instanceof FindPrimary msg) {
+ findPrimary(msg);
+ } else if (message instanceof FindLocalShard msg) {
+ findLocalShard(msg);
+ } else if (message instanceof UpdateSchemaContext msg) {
+ updateSchemaContext(msg);
+ } else if (message instanceof ActorInitialized msg) {
+ onActorInitialized(msg);
+ } else if (message instanceof ClusterEvent.MemberUp msg) {
+ memberUp(msg);
+ } else if (message instanceof ClusterEvent.MemberWeaklyUp msg) {
+ memberWeaklyUp(msg);
+ } else if (message instanceof ClusterEvent.MemberExited msg) {
+ memberExited(msg);
+ } else if (message instanceof ClusterEvent.MemberRemoved msg) {
+ memberRemoved(msg);
+ } else if (message instanceof ClusterEvent.UnreachableMember msg) {
+ memberUnreachable(msg);
+ } else if (message instanceof ClusterEvent.ReachableMember msg) {
+ memberReachable(msg);
+ } else if (message instanceof DatastoreContextFactory msg) {
+ onDatastoreContextFactory(msg);
+ } else if (message instanceof RoleChangeNotification msg) {
+ onRoleChangeNotification(msg);
+ } else if (message instanceof FollowerInitialSyncUpStatus msg) {
+ onFollowerInitialSyncStatus(msg);
+ } else if (message instanceof ShardNotInitializedTimeout msg) {
+ onShardNotInitializedTimeout(msg);
+ } else if (message instanceof ShardLeaderStateChanged msg) {
+ onLeaderStateChanged(msg);
+ } else if (message instanceof SwitchShardBehavior msg) {
+ onSwitchShardBehavior(msg);
+ } else if (message instanceof CreateShard msg) {
+ onCreateShard(msg);
+ } else if (message instanceof AddShardReplica msg) {
+ onAddShardReplica(msg);
} else if (message instanceof ForwardedAddServerReply msg) {
- onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
- msg.removeShardOnFailure);
+ onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath, msg.removeShardOnFailure);
} else if (message instanceof ForwardedAddServerFailure msg) {
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
- } else if (message instanceof RemoveShardReplica) {
- onRemoveShardReplica((RemoveShardReplica) message);
- } else if (message instanceof WrappedShardResponse) {
- onWrappedShardResponse((WrappedShardResponse) message);
- } else if (message instanceof GetSnapshot) {
- onGetSnapshot((GetSnapshot) message);
- } else if (message instanceof ServerRemoved) {
- onShardReplicaRemoved((ServerRemoved) message);
- } else if (message instanceof ChangeShardMembersVotingStatus) {
- onChangeShardServersVotingStatus((ChangeShardMembersVotingStatus) message);
- } else if (message instanceof FlipShardMembersVotingStatus) {
- onFlipShardMembersVotingStatus((FlipShardMembersVotingStatus) message);
- } else if (message instanceof SaveSnapshotSuccess) {
- onSaveSnapshotSuccess((SaveSnapshotSuccess) message);
- } else if (message instanceof SaveSnapshotFailure) {
- LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(),
- ((SaveSnapshotFailure) message).cause());
+ } else if (message instanceof RemoveShardReplica msg) {
+ onRemoveShardReplica(msg);
+ } else if (message instanceof WrappedShardResponse msg) {
+ onWrappedShardResponse(msg);
+ } else if (message instanceof GetSnapshot msg) {
+ onGetSnapshot(msg);
+ } else if (message instanceof ServerRemoved msg) {
+ onShardReplicaRemoved(msg);
+ } else if (message instanceof ChangeShardMembersVotingStatus msg) {
+ onChangeShardServersVotingStatus(msg);
+ } else if (message instanceof FlipShardMembersVotingStatus msg) {
+ onFlipShardMembersVotingStatus(msg);
+ } else if (message instanceof SaveSnapshotSuccess msg) {
+ onSaveSnapshotSuccess(msg);
+ } else if (message instanceof SaveSnapshotFailure msg) {
+ LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), msg.cause());
} else if (message instanceof Shutdown) {
onShutDown();
} else if (message instanceof GetLocalShardIds) {
onGetLocalShardIds();
- } else if (message instanceof GetShardRole) {
- onGetShardRole((GetShardRole) message);
- } else if (message instanceof RunnableMessage) {
- ((RunnableMessage)message).run();
- } else if (message instanceof RegisterForShardAvailabilityChanges) {
- onRegisterForShardAvailabilityChanges((RegisterForShardAvailabilityChanges)message);
- } else if (message instanceof DeleteSnapshotsFailure) {
- LOG.warn("{}: Failed to delete prior snapshots", persistenceId(),
- ((DeleteSnapshotsFailure) message).cause());
+ } else if (message instanceof GetShardRole msg) {
+ onGetShardRole(msg);
+ } else if (message instanceof RunnableMessage msg) {
+ msg.run();
+ } else if (message instanceof RegisterForShardAvailabilityChanges msg) {
+ onRegisterForShardAvailabilityChanges(msg);
+ } else if (message instanceof DeleteSnapshotsFailure msg) {
+ LOG.warn("{}: Failed to delete prior snapshots", persistenceId(), msg.cause());
} else if (message instanceof DeleteSnapshotsSuccess) {
LOG.debug("{}: Successfully deleted prior snapshots", persistenceId());
} else if (message instanceof RegisterRoleChangeListenerReply) {
LOG.trace("{}: Received RegisterRoleChangeListenerReply", persistenceId());
- } else if (message instanceof ClusterEvent.MemberEvent) {
- LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), message);
+ } else if (message instanceof ClusterEvent.MemberEvent msg) {
+ LOG.trace("{}: Received other ClusterEvent.MemberEvent: {}", persistenceId(), msg);
} else {
unknownMessage(message);
}
return true;
}
- private void onActorInitialized(final Object message) {
+ private void onActorInitialized(final ActorInitialized message) {
final ActorRef sender = getSender();
if (sender == null) {
*
* @param message the message to send
*/
- private void updateSchemaContext(final Object message) {
- schemaContext = ((UpdateSchemaContext) message).getEffectiveModelContext();
+ private void updateSchemaContext(final UpdateSchemaContext message) {
+ schemaContext = message.getEffectiveModelContext();
LOG.debug("Got updated SchemaContext: # of modules {}", schemaContext.getModules().size());
public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
handler.onFailure(failure);
+ } else if (response instanceof RemotePrimaryShardFound msg) {
+ handler.onRemotePrimaryShardFound(msg);
+ } else if (response instanceof LocalPrimaryShardFound msg) {
+ handler.onLocalPrimaryFound(msg);
} else {
- if (response instanceof RemotePrimaryShardFound) {
- handler.onRemotePrimaryShardFound((RemotePrimaryShardFound) response);
- } else if (response instanceof LocalPrimaryShardFound) {
- handler.onLocalPrimaryFound((LocalPrimaryShardFound) response);
- } else {
- handler.onUnknownResponse(response);
- }
+ handler.onUnknownResponse(response);
}
}
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
LOG.debug("{}: Received failure from FindLocalShard for shard {}", persistenceId, shardName,
- failure);
+ failure);
sender.tell(new Status.Failure(new RuntimeException(
- String.format("Failed to find local shard %s", shardName), failure)), self());
+ String.format("Failed to find local shard %s", shardName), failure)), self());
+ } if (response instanceof LocalShardFound msg) {
+ getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept(msg), sender);
+ } else if (response instanceof LocalShardNotFound) {
+ LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName);
+ sender.tell(new Status.Failure(new IllegalArgumentException(
+ String.format("Local shard %s does not exist", shardName))), self());
} else {
- if (response instanceof LocalShardFound) {
- getSelf().tell((RunnableMessage) () -> onLocalShardFound.accept((LocalShardFound) response),
- sender);
- } else if (response instanceof LocalShardNotFound) {
- LOG.debug("{}: Local shard {} does not exist", persistenceId, shardName);
- sender.tell(new Status.Failure(new IllegalArgumentException(
- String.format("Local shard %s does not exist", shardName))), self());
- } else {
- LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName,
- response);
- sender.tell(new Status.Failure(response instanceof Throwable ? (Throwable) response
- : new RuntimeException(
- String.format("Failed to find local shard %s: received response: %s", shardName,
- response))), self());
- }
+ LOG.debug("{}: Failed to find local shard {}: received response: {}", persistenceId, shardName,
+ response);
+ sender.tell(new Status.Failure(response instanceof Throwable throwable ? throwable
+ : new RuntimeException(String.format("Failed to find local shard %s: received response: %s",
+ shardName, response))), self());
}
}
}, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));