- });
- } else {
- LOG.info("Rejecting schema context update because it is not a super set of previously known modules");
- }
-
- }
-
- private void findPrimary(FindPrimary message) {
- String shardName = message.getShardName();
-
- // First see if the there is a local replica for the shard
- final ShardInformation info = localShards.get(shardName);
- if (info != null) {
- sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
- @Override
- public Object get() {
- return new PrimaryFound(info.getActorPath().toString()).toSerializable();
- }
- });
-
- return;
- }
-
- List<String> members = configuration.getMembersFromShardName(shardName);
-
- if(cluster.getCurrentMemberName() != null) {
- members.remove(cluster.getCurrentMemberName());
- }
-
- /**
- * FIXME: Instead of sending remote shard actor path back to sender,
- * forward FindPrimary message to remote shard manager
- */
- // There is no way for us to figure out the primary (for now) so assume
- // that one of the remote nodes is a primary
- for(String memberName : members) {
- Address address = memberNameToAddress.get(memberName);
- if(address != null){
- String path =
- getShardActorPath(shardName, memberName);
- getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
- return;
- }
- }
- getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
- }
-
- private String getShardActorPath(String shardName, String memberName) {
- Address address = memberNameToAddress.get(memberName);
- if(address != null) {
- StringBuilder builder = new StringBuilder();
- builder.append(address.toString())
- .append("/user/")
- .append(ShardManagerIdentifier.builder().type(type).build().toString())
- .append("/")
- .append(getShardIdentifier(memberName, shardName));
- return builder.toString();
- }
- return null;
- }
-
- /**
- * Construct the name of the shard actor given the name of the member on
- * which the shard resides and the name of the shard
- *
- * @param memberName
- * @param shardName
- * @return
- */
- private ShardIdentifier getShardIdentifier(String memberName, String shardName){
- return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
- }
-
- /**
- * Create shards that are local to the member on which the ShardManager
- * runs
- *
- */
- private void createLocalShards() {
- String memberName = this.cluster.getCurrentMemberName();
- List<String> memberShardNames =
- this.configuration.getMemberShardNames(memberName);
-
- List<String> localShardActorNames = new ArrayList<>();
- for(String shardName : memberShardNames){
- ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
- Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
- localShardActorNames.add(shardId.toString());
- localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
- }
-
- mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
- datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
- }
-
- /**
- * Given the name of the shard find the addresses of all it's peers
- *
- * @param shardName
- * @return
- */
- private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
-
- Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
-
- List<String> members =
- this.configuration.getMembersFromShardName(shardName);
-
- String currentMemberName = this.cluster.getCurrentMemberName();
-
- for(String memberName : members){
- if(!currentMemberName.equals(memberName)){
- ShardIdentifier shardId = getShardIdentifier(memberName,
- shardName);
- String path =
- getShardActorPath(shardName, currentMemberName);
- peerAddresses.put(shardId, path);
- }
- }
- return peerAddresses;
- }
-
- @Override
- public SupervisorStrategy supervisorStrategy() {
-
- return new OneForOneStrategy(10, Duration.create("1 minute"),
- new Function<Throwable, SupervisorStrategy.Directive>() {
- @Override
- public SupervisorStrategy.Directive apply(Throwable t) {
- StringBuilder sb = new StringBuilder();
- for(StackTraceElement element : t.getStackTrace()) {
- sb.append("\n\tat ")
- .append(element.toString());
- }
- LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
- return SupervisorStrategy.resume();
- }
- }
- );
-
- }
-
- @Override
- public String persistenceId() {
- return "shard-manager-" + type;
- }
-
- @VisibleForTesting
- Collection<String> getKnownModules() {
- return knownModules;
- }
-
- @VisibleForTesting
- DataPersistenceProvider getDataPersistenceProvider() {
- return dataPersistenceProvider;
- }
-
- private class ShardInformation {
- private final ShardIdentifier shardId;
- private final String shardName;
- private ActorRef actor;
- private ActorPath actorPath;
- private final Map<ShardIdentifier, String> peerAddresses;
-
- // flag that determines if the actor is ready for business
- private boolean actorInitialized = false;
-
- private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
-
- private ShardInformation(String shardName, ShardIdentifier shardId,
- Map<ShardIdentifier, String> peerAddresses) {
- this.shardName = shardName;
- this.shardId = shardId;
- this.peerAddresses = peerAddresses;
- }
-
- String getShardName() {
- return shardName;
- }
-
- ActorRef getActor(){
- return actor;
- }
-
- ActorPath getActorPath() {
- return actorPath;
- }
-
- void setActor(ActorRef actor) {
- this.actor = actor;
- this.actorPath = actor.path();
- }
-
- ShardIdentifier getShardId() {
- return shardId;
- }
-
- Map<ShardIdentifier, String> getPeerAddresses() {
- return peerAddresses;
- }
-
- void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
- LOG.info("updatePeerAddress for peer {} with address {}", peerId,
- peerAddress);
- if(peerAddresses.containsKey(peerId)){
- peerAddresses.put(peerId, peerAddress);
-
- if(actor != null) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
- peerId, peerAddress, actor.path());
- }
-
- actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
- }
- }
- }
-
- boolean isShardInitialized() {
- return getActor() != null && actorInitialized;
- }
-
- void setActorInitialized() {
- this.actorInitialized = true;
-
- for(Runnable runnable: runnablesOnInitialized) {
- runnable.run();
- }
-
- runnablesOnInitialized.clear();
- }
-
- void addRunnableOnInitialized(Runnable runnable) {
- runnablesOnInitialized.add(runnable);
- }
- }
-
- private static class ShardManagerCreator implements Creator<ShardManager> {
- private static final long serialVersionUID = 1L;
-
- final String type;
- final ClusterWrapper cluster;
- final Configuration configuration;
- final DatastoreContext datastoreContext;
-
- ShardManagerCreator(String type, ClusterWrapper cluster,
- Configuration configuration, DatastoreContext datastoreContext) {
- this.type = type;
- this.cluster = cluster;
- this.configuration = configuration;
- this.datastoreContext = datastoreContext;
- }
-
- @Override
- public ShardManager create() throws Exception {
- return new ShardManager(type, cluster, configuration, datastoreContext);
- }
- }
-
- static class SchemaContextModules implements Serializable {