- protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
- DatastoreContext datastoreContext) {
-
- this.type = Preconditions.checkNotNull(type, "type should not be null");
- this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
- this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
- this.datastoreContext = datastoreContext;
-
- // Subscribe this actor to cluster member events
- cluster.subscribeToMemberEvents(getSelf());
-
- createLocalShards();
- }
-
- public static Props props(final String type,
- final ClusterWrapper cluster,
- final Configuration configuration,
- final DatastoreContext datastoreContext) {
-
- Preconditions.checkNotNull(type, "type should not be null");
- Preconditions.checkNotNull(cluster, "cluster should not be null");
- Preconditions.checkNotNull(configuration, "configuration should not be null");
-
- return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
- }
-
- @Override
- public void handleCommand(Object message) throws Exception {
- if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
- findPrimary(FindPrimary.fromSerializable(message));
- } else if(message instanceof FindLocalShard){
- findLocalShard((FindLocalShard) message);
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext(message);
- } else if(message instanceof ActorInitialized) {
- onActorInitialized(message);
- } else if (message instanceof ClusterEvent.MemberUp){
- memberUp((ClusterEvent.MemberUp) message);
- } else if(message instanceof ClusterEvent.MemberRemoved) {
- memberRemoved((ClusterEvent.MemberRemoved) message);
- } else if(message instanceof ClusterEvent.UnreachableMember) {
- ignoreMessage(message);
- } else{
- unknownMessage(message);
- }
-
- }
-
- private void onActorInitialized(Object message) {
- final ActorRef sender = getSender();
-
- if (sender == null) {
- return; //why is a non-actor sending this message? Just ignore.
- }
-
- String actorName = sender.path().name();
- //find shard name from actor name; actor name is stringified shardId
- ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
-
- if (shardId.getShardName() == null) {
- return;
- }
- markShardAsInitialized(shardId.getShardName());
- }
-
- private void markShardAsInitialized(String shardName) {
- LOG.debug("Initializing shard [{}]", shardName);
- ShardInformation shardInformation = localShards.get(shardName);
- if (shardInformation != null) {
- shardInformation.setActorInitialized();
- }
- }
-
- @Override
- protected void handleRecover(Object message) throws Exception {
- if(message instanceof SchemaContextModules){
- SchemaContextModules msg = (SchemaContextModules) message;
- knownModules.clear();
- knownModules.addAll(msg.getModules());
- } else if(message instanceof RecoveryFailure){
- RecoveryFailure failure = (RecoveryFailure) message;
- LOG.error(failure.cause(), "Recovery failed");
- } else if(message instanceof RecoveryCompleted){
- LOG.info("Recovery complete : {}", persistenceId());
-
- // Delete all the messages from the akka journal except the last one
- deleteMessages(lastSequenceNr() - 1);
- }
- }
-
- private void findLocalShard(FindLocalShard message) {
- final ShardInformation shardInformation = localShards.get(message.getShardName());
-
- if(shardInformation == null){
- getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
- return;
- }
-
- sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
- @Override
- public Object get() {
- return new LocalShardFound(shardInformation.getActor());
- }
- });
- }
-
- private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
- final Supplier<Object> messageSupplier) {
- if (!shardInformation.isShardInitialized()) {
- if(waitUntilInitialized) {
- final ActorRef sender = getSender();
- final ActorRef self = self();
- shardInformation.addRunnableOnInitialized(new Runnable() {
- @Override
- public void run() {
- sender.tell(messageSupplier.get(), self);
- }
- });
- } else {
- getSender().tell(new ActorNotInitialized(), getSelf());
- }
-
- return;
- }
-
- getSender().tell(messageSupplier.get(), getSelf());
- }
-
- private void memberRemoved(ClusterEvent.MemberRemoved message) {
- memberNameToAddress.remove(message.member().roles().head());
- }
-
- private void memberUp(ClusterEvent.MemberUp message) {
- String memberName = message.member().roles().head();
-
- memberNameToAddress.put(memberName, message.member().address());
-
- for(ShardInformation info : localShards.values()){
- String shardName = info.getShardName();
- info.updatePeerAddress(getShardIdentifier(memberName, shardName),
- getShardActorPath(shardName, memberName));
- }
- }
-
- /**
- * Notifies all the local shards of a change in the schema context
- *
- * @param message
- */
- private void updateSchemaContext(final Object message) {
- final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
-
- Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
- Set<String> newModules = new HashSet<>(128);
-
- for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
- String s = moduleIdentifier.getNamespace().toString();
- newModules.add(s);
- }
-
- if(newModules.containsAll(knownModules)) {
-
- LOG.info("New SchemaContext has a super set of current knownModules - persisting info");
-
- knownModules.clear();
- knownModules.addAll(newModules);
-
- persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
-
- @Override
- public void apply(SchemaContextModules param) throws Exception {
- LOG.info("Sending new SchemaContext to Shards");
- for (ShardInformation info : localShards.values()) {
- if(info.getActor() == null) {
- info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
- info.getPeerAddresses(), datastoreContext, schemaContext),
- info.getShardId().toString()));
- } else {
- info.getActor().tell(message, getSelf());
- }
- }
- }
-
- });
- } else {
- LOG.info("Rejecting schema context update because it is not a super set of previously known modules");
- }
-
- }
-
- private void findPrimary(FindPrimary message) {
- String shardName = message.getShardName();
-
- // First see if the there is a local replica for the shard
- final ShardInformation info = localShards.get(shardName);
- if (info != null) {
- sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
- @Override
- public Object get() {
- return new PrimaryFound(info.getActorPath().toString()).toSerializable();
- }
- });
-
- return;
- }
-
- List<String> members = configuration.getMembersFromShardName(shardName);
-
- if(cluster.getCurrentMemberName() != null) {
- members.remove(cluster.getCurrentMemberName());
- }