X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=5fbce4cd98900be65053939ca7a51d4f7a929a98;hp=0363b3ceb370772ca617bb50abcddef95ed7da5e;hb=475d28f717bae92b2cc10b0589131771fcc62242;hpb=ed693440aa741fee9b94447f8404d89b4020f616;ds=sidebyside diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 0363b3ceb3..5fbce4cd98 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -19,6 +19,7 @@ import akka.japi.Creator; import akka.japi.Function; import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; @@ -56,7 +57,7 @@ public class ShardManager extends AbstractUntypedActor { // Stores a mapping between a member name and the address of the member private final Map memberNameToAddress = new HashMap<>(); - private final Map localShards = new HashMap<>(); + private final Map localShards = new HashMap<>(); private final String type; @@ -125,14 +126,20 @@ public class ShardManager extends AbstractUntypedActor { } private void memberUp(ClusterEvent.MemberUp message) { - memberNameToAddress.put(message.member().roles().head(), message.member().address()); + String memberName = message.member().roles().head(); + + memberNameToAddress.put(memberName , message.member().address()); + + for(ShardInformation info : localShards.values()){ + String shardName = info.getShardName(); + info.updatePeerAddress(getShardActorName(memberName, shardName), + getShardActorPath(shardName, memberName)); + } } private void updateSchemaContext(Object message) { - for(ActorPath path : localShards.values()){ - getContext().system().actorSelection(path) - .forward(message, - getContext()); + for(ShardInformation info : localShards.values()){ + info.getActor().tell(message,getSelf()); } } @@ -142,35 +149,50 @@ public class ShardManager extends AbstractUntypedActor { List members = configuration.getMembersFromShardName(shardName); - for(String memberName : members) { - if (memberName.equals(cluster.getCurrentMemberName())) { - // This is a local shard - ActorPath shardPath = localShards.get(shardName); - if (shardPath == null) { - getSender() - .tell(new PrimaryNotFound(shardName).toSerializable(), getSelf()); - return; - } - getSender().tell(new PrimaryFound(shardPath.toString()).toSerializable(), - getSelf()); + // First see if the there is a local replica for the shard + ShardInformation info = localShards.get(shardName); + if(info != null) { + ActorPath shardPath = info.getActorPath(); + if (shardPath != null) { + getSender() + .tell( + new PrimaryFound(shardPath.toString()).toSerializable(), + getSelf()); return; - } else { - Address address = memberNameToAddress.get(memberName); - if(address != null){ - String path = - address.toString() + "/user/shardmanager-" + this.type + "/" + getShardActorName( - memberName, shardName); - getSender().tell(new PrimaryFound(path).toSerializable(), getSelf()); - return; - } + } + } + if(cluster.getCurrentMemberName() != null) { + members.remove(cluster.getCurrentMemberName()); + } + // There is no way for us to figure out the primary (for now) so assume + // that one of the remote nodes is a primary + for(String memberName : members) { + Address address = memberNameToAddress.get(memberName); + if(address != null){ + String path = + getShardActorPath(shardName, memberName); + getSender().tell(new PrimaryFound(path).toSerializable(), getSelf()); + return; } } - getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf()); } + private String + + + getShardActorPath(String shardName, String memberName) { + Address address = memberNameToAddress.get(memberName); + if(address != null) { + return address.toString() + "/user/shardmanager-" + this.type + "/" + + getShardActorName( + memberName, shardName); + } + return null; + } + private String getShardActorName(String memberName, String shardName){ return memberName + "-shard-" + shardName + "-" + this.type; } @@ -183,14 +205,35 @@ public class ShardManager extends AbstractUntypedActor { for(String shardName : memberShardNames){ String shardActorName = getShardActorName(memberName, shardName); + Map peerAddresses = getPeerAddresses(shardName); ActorRef actor = getContext() - .actorOf(Shard.props(shardActorName), shardActorName); - ActorPath path = actor.path(); - localShards.put(shardName, path); + .actorOf(Shard.props(shardActorName, peerAddresses), + shardActorName); + localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses)); } } + private Map getPeerAddresses(String shardName){ + + Map peerAddresses = new HashMap<>(); + + List members = + this.configuration.getMembersFromShardName(shardName); + + String currentMemberName = this.cluster.getCurrentMemberName(); + + for(String memberName : members){ + if(!currentMemberName.equals(memberName)){ + String shardActorName = getShardActorName(memberName, shardName); + String path = + getShardActorPath(shardName, currentMemberName); + peerAddresses.put(shardActorName, path); + } + } + return peerAddresses; + } + @Override public SupervisorStrategy supervisorStrategy() { @@ -204,4 +247,49 @@ public class ShardManager extends AbstractUntypedActor { ); } + + private class ShardInformation { + private final String shardName; + private final ActorRef actor; + private final ActorPath actorPath; + private final Map peerAddresses; + + private ShardInformation(String shardName, ActorRef actor, + Map peerAddresses) { + this.shardName = shardName; + this.actor = actor; + this.actorPath = actor.path(); + this.peerAddresses = peerAddresses; + } + + public String getShardName() { + return shardName; + } + + public ActorRef getActor(){ + return actor; + } + + public ActorPath getActorPath() { + return actorPath; + } + + public Map getPeerAddresses() { + return peerAddresses; + } + + public void updatePeerAddress(String peerId, String peerAddress){ + LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); + if(peerAddresses.containsKey(peerId)){ + peerAddresses.put(peerId, peerAddress); + + LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path()); + + actor + .tell(new PeerAddressResolved(peerId, peerAddress), + getSelf()); + + } + } + } }