+ private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
+ final Supplier<Object> messageSupplier) {
+ if (!shardInformation.isShardInitialized()) {
+ if(waitUntilInitialized) {
+ final ActorRef sender = getSender();
+ final ActorRef self = self();
+ shardInformation.addRunnableOnInitialized(new Runnable() {
+ @Override
+ public void run() {
+ sender.tell(messageSupplier.get(), self);
+ }
+ });
+ } else {
+ getSender().tell(new ActorNotInitialized(), getSelf());
+ }
+
+ return;
+ }
+
+ getSender().tell(messageSupplier.get(), getSelf());
+ }
+
+ private void memberRemoved(ClusterEvent.MemberRemoved message) {
+ memberNameToAddress.remove(message.member().roles().head());
+ }
+
+ private void memberUp(ClusterEvent.MemberUp message) {
+ String memberName = message.member().roles().head();
+
+ memberNameToAddress.put(memberName, message.member().address());
+
+ for(ShardInformation info : localShards.values()){
+ String shardName = info.getShardName();
+ info.updatePeerAddress(getShardIdentifier(memberName, shardName),
+ getShardActorPath(shardName, memberName));
+ }
+ }
+
+ /**
+ * Notifies all the local shards of a change in the schema context
+ *
+ * @param message
+ */
+ private void updateSchemaContext(final Object message) {
+ final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+ Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
+ Set<String> newModules = new HashSet<>(128);
+
+ for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
+ String s = moduleIdentifier.getNamespace().toString();
+ newModules.add(s);
+ }
+
+ if(newModules.containsAll(knownModules)) {
+
+ LOG.info("New SchemaContext has a super set of current knownModules - persisting info");
+
+ knownModules = ImmutableSet.copyOf(newModules);
+
+ dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
+
+ @Override
+ public void apply(SchemaContextModules param) throws Exception {
+ LOG.info("Sending new SchemaContext to Shards");
+ for (ShardInformation info : localShards.values()) {
+ if (info.getActor() == null) {
+ info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
+ info.getPeerAddresses(), datastoreContext, schemaContext),
+ info.getShardId().toString()));
+ } else {
+ info.getActor().tell(message, getSelf());
+ }
+ }
+ }
+
+ });
+ } else {
+ LOG.info("Rejecting schema context update because it is not a super set of previously known modules");
+ }
+
+ }
+
+ private void findPrimary(FindPrimary message) {
+ String shardName = message.getShardName();
+
+ // First see if the there is a local replica for the shard
+ final ShardInformation info = localShards.get(shardName);
+ if (info != null) {
+ sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
+ @Override
+ public Object get() {
+ return new PrimaryFound(info.getActorPath().toString()).toSerializable();
+ }
+ });
+
+ return;
+ }
+
+ List<String> members = configuration.getMembersFromShardName(shardName);
+
+ if(cluster.getCurrentMemberName() != null) {
+ members.remove(cluster.getCurrentMemberName());
+ }
+
+ /**
+ * FIXME: Instead of sending remote shard actor path back to sender,
+ * forward FindPrimary message to remote shard manager
+ */
+ // There is no way for us to figure out the primary (for now) so assume
+ // that one of the remote nodes is a primary
+ for(String memberName : members) {
+ Address address = memberNameToAddress.get(memberName);
+ if(address != null){
+ String path =
+ getShardActorPath(shardName, memberName);
+ getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+ return;
+ }
+ }
+ getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+ }
+
+ private String getShardActorPath(String shardName, String memberName) {
+ Address address = memberNameToAddress.get(memberName);
+ if(address != null) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(address.toString())
+ .append("/user/")
+ .append(ShardManagerIdentifier.builder().type(type).build().toString())
+ .append("/")
+ .append(getShardIdentifier(memberName, shardName));
+ return builder.toString();
+ }
+ return null;
+ }
+
+ /**
+ * Construct the name of the shard actor given the name of the member on
+ * which the shard resides and the name of the shard
+ *
+ * @param memberName
+ * @param shardName
+ * @return
+ */
+ private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+ return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
+ }
+
+ /**
+ * Create shards that are local to the member on which the ShardManager
+ * runs
+ *
+ */
+ private void createLocalShards() {
+ String memberName = this.cluster.getCurrentMemberName();
+ List<String> memberShardNames =
+ this.configuration.getMemberShardNames(memberName);
+
+ List<String> localShardActorNames = new ArrayList<>();
+ for(String shardName : memberShardNames){
+ ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+ Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
+ localShardActorNames.add(shardId.toString());
+ localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
+ }
+
+ mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+ datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+ }
+
+ /**
+ * Given the name of the shard find the addresses of all it's peers
+ *
+ * @param shardName
+ * @return
+ */
+ private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
+
+ Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
+
+ List<String> members =
+ this.configuration.getMembersFromShardName(shardName);
+
+ String currentMemberName = this.cluster.getCurrentMemberName();
+
+ for(String memberName : members){
+ if(!currentMemberName.equals(memberName)){
+ ShardIdentifier shardId = getShardIdentifier(memberName,
+ shardName);
+ String path =
+ getShardActorPath(shardName, currentMemberName);
+ peerAddresses.put(shardId, path);
+ }
+ }
+ return peerAddresses;
+ }
+