Create all the local shard replicas that belong on this cluster member
- *
Find the address of the local shard
- *
Find the primary replica for any given shard
- *
Monitor the cluster members and store their addresses
- *
+ * @deprecated This is a deprecated placeholder to keep its inner class present. It serves no other purpose.
*/
-public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
-
- protected final LoggingAdapter LOG =
- Logging.getLogger(getContext().system(), this);
-
- // Stores a mapping between a member name and the address of the member
- // Member names look like "member-1", "member-2" etc and are as specified
- // in configuration
- private final Map memberNameToAddress = new HashMap<>();
-
- // Stores a mapping between a shard name and it's corresponding information
- // Shard names look like inventory, topology etc and are as specified in
- // configuration
- private final Map localShards = new HashMap<>();
-
- // The type of a ShardManager reflects the type of the datastore itself
- // A data store could be of type config/operational
- private final String type;
-
- private final ClusterWrapper cluster;
-
- private final Configuration configuration;
-
- private ShardManagerInfoMBean mBean;
-
- private final DatastoreContext datastoreContext;
-
- private Collection knownModules = Collections.emptySet();
-
- private final DataPersistenceProvider dataPersistenceProvider;
-
- /**
- * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
- * configuration or operational
- */
- protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
- DatastoreContext datastoreContext) {
-
- this.type = Preconditions.checkNotNull(type, "type should not be null");
- this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
- this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
- this.datastoreContext = datastoreContext;
- this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
-
- // Subscribe this actor to cluster member events
- cluster.subscribeToMemberEvents(getSelf());
-
- createLocalShards();
- }
-
- protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
- return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
- }
-
- public static Props props(final String type,
- final ClusterWrapper cluster,
- final Configuration configuration,
- final DatastoreContext datastoreContext) {
-
- Preconditions.checkNotNull(type, "type should not be null");
- Preconditions.checkNotNull(cluster, "cluster should not be null");
- Preconditions.checkNotNull(configuration, "configuration should not be null");
-
- return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
- }
-
- @Override
- public void handleCommand(Object message) throws Exception {
- if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
- findPrimary(FindPrimary.fromSerializable(message));
- } else if(message instanceof FindLocalShard){
- findLocalShard((FindLocalShard) message);
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext(message);
- } else if(message instanceof ActorInitialized) {
- onActorInitialized(message);
- } else if (message instanceof ClusterEvent.MemberUp){
- memberUp((ClusterEvent.MemberUp) message);
- } else if(message instanceof ClusterEvent.MemberRemoved) {
- memberRemoved((ClusterEvent.MemberRemoved) message);
- } else if(message instanceof ClusterEvent.UnreachableMember) {
- ignoreMessage(message);
- } else{
- unknownMessage(message);
- }
-
- }
-
- private void onActorInitialized(Object message) {
- final ActorRef sender = getSender();
-
- if (sender == null) {
- return; //why is a non-actor sending this message? Just ignore.
- }
-
- String actorName = sender.path().name();
- //find shard name from actor name; actor name is stringified shardId
- ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
-
- if (shardId.getShardName() == null) {
- return;
- }
- markShardAsInitialized(shardId.getShardName());
- }
-
- private void markShardAsInitialized(String shardName) {
- LOG.debug("Initializing shard [{}]", shardName);
- ShardInformation shardInformation = localShards.get(shardName);
- if (shardInformation != null) {
- shardInformation.setActorInitialized();
- }
- }
-
- @Override
- protected void handleRecover(Object message) throws Exception {
- if(dataPersistenceProvider.isRecoveryApplicable()) {
- if (message instanceof SchemaContextModules) {
- SchemaContextModules msg = (SchemaContextModules) message;
- knownModules = ImmutableSet.copyOf(msg.getModules());
- } else if (message instanceof RecoveryFailure) {
- RecoveryFailure failure = (RecoveryFailure) message;
- LOG.error(failure.cause(), "Recovery failed");
- } else if (message instanceof RecoveryCompleted) {
- LOG.info("Recovery complete : {}", persistenceId());
-
- // Delete all the messages from the akka journal except the last one
- deleteMessages(lastSequenceNr() - 1);
- }
- } else {
- if (message instanceof RecoveryCompleted) {
- LOG.info("Recovery complete : {}", persistenceId());
-
- // Delete all the messages from the akka journal
- deleteMessages(lastSequenceNr());
- }
- }
- }
-
- private void findLocalShard(FindLocalShard message) {
- final ShardInformation shardInformation = localShards.get(message.getShardName());
-
- if(shardInformation == null){
- getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
- return;
- }
-
- sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier