Add replication capability to Shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 0363b3ceb370772ca617bb50abcddef95ed7da5e..5fbce4cd98900be65053939ca7a51d4f7a929a98 100644 (file)
@@ -19,6 +19,7 @@ import akka.japi.Creator;
 import akka.japi.Function;
 import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
@@ -56,7 +57,7 @@ public class ShardManager extends AbstractUntypedActor {
     // Stores a mapping between a member name and the address of the member
     private final Map<String, Address> memberNameToAddress = new HashMap<>();
 
-    private final Map<String, ActorPath> localShards = new HashMap<>();
+    private final Map<String, ShardInformation> localShards = new HashMap<>();
 
 
     private final String type;
@@ -125,14 +126,20 @@ public class ShardManager extends AbstractUntypedActor {
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
-        memberNameToAddress.put(message.member().roles().head(), message.member().address());
+        String memberName = message.member().roles().head();
+
+        memberNameToAddress.put(memberName , message.member().address());
+
+        for(ShardInformation info : localShards.values()){
+            String shardName = info.getShardName();
+            info.updatePeerAddress(getShardActorName(memberName, shardName),
+                getShardActorPath(shardName, memberName));
+        }
     }
 
     private void updateSchemaContext(Object message) {
-        for(ActorPath path : localShards.values()){
-            getContext().system().actorSelection(path)
-                .forward(message,
-                    getContext());
+        for(ShardInformation info : localShards.values()){
+            info.getActor().tell(message,getSelf());
         }
     }
 
@@ -142,35 +149,50 @@ public class ShardManager extends AbstractUntypedActor {
         List<String> members =
             configuration.getMembersFromShardName(shardName);
 
-        for(String memberName : members) {
-            if (memberName.equals(cluster.getCurrentMemberName())) {
-                // This is a local shard
-                ActorPath shardPath = localShards.get(shardName);
-                if (shardPath == null) {
-                    getSender()
-                        .tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
-                    return;
-                }
-                getSender().tell(new PrimaryFound(shardPath.toString()).toSerializable(),
-                    getSelf());
+        // First see if the there is a local replica for the shard
+        ShardInformation info = localShards.get(shardName);
+        if(info != null) {
+            ActorPath shardPath = info.getActorPath();
+            if (shardPath != null) {
+                getSender()
+                    .tell(
+                        new PrimaryFound(shardPath.toString()).toSerializable(),
+                        getSelf());
                 return;
-            } else {
-                Address address = memberNameToAddress.get(memberName);
-                if(address != null){
-                    String path =
-                        address.toString() + "/user/shardmanager-" + this.type + "/" + getShardActorName(
-                            memberName, shardName);
-                    getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
-                    return;
-                }
+            }
+        }
 
+        if(cluster.getCurrentMemberName() != null) {
+            members.remove(cluster.getCurrentMemberName());
+        }
 
+        // There is no way for us to figure out the primary (for now) so assume
+        // that one of the remote nodes is a primary
+        for(String memberName : members) {
+            Address address = memberNameToAddress.get(memberName);
+            if(address != null){
+                String path =
+                    getShardActorPath(shardName, memberName);
+                getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+                return;
             }
         }
-
         getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
     }
 
+    private String
+
+
+    getShardActorPath(String shardName, String memberName) {
+        Address address = memberNameToAddress.get(memberName);
+        if(address != null) {
+            return address.toString() + "/user/shardmanager-" + this.type + "/"
+                + getShardActorName(
+                memberName, shardName);
+        }
+        return null;
+    }
+
     private String getShardActorName(String memberName, String shardName){
         return memberName + "-shard-" + shardName + "-" + this.type;
     }
@@ -183,14 +205,35 @@ public class ShardManager extends AbstractUntypedActor {
 
         for(String shardName : memberShardNames){
             String shardActorName = getShardActorName(memberName, shardName);
+            Map<String, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardActorName), shardActorName);
-            ActorPath path = actor.path();
-            localShards.put(shardName, path);
+                .actorOf(Shard.props(shardActorName, peerAddresses),
+                    shardActorName);
+            localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
         }
 
     }
 
+    private Map<String, String> getPeerAddresses(String shardName){
+
+        Map<String, String> peerAddresses = new HashMap<>();
+
+        List<String> members =
+            this.configuration.getMembersFromShardName(shardName);
+
+        String currentMemberName = this.cluster.getCurrentMemberName();
+
+        for(String memberName : members){
+            if(!currentMemberName.equals(memberName)){
+                String shardActorName = getShardActorName(memberName, shardName);
+                String path =
+                    getShardActorPath(shardName, currentMemberName);
+                peerAddresses.put(shardActorName, path);
+            }
+        }
+        return peerAddresses;
+    }
+
 
     @Override
     public SupervisorStrategy supervisorStrategy() {
@@ -204,4 +247,49 @@ public class ShardManager extends AbstractUntypedActor {
         );
 
     }
+
+    private class ShardInformation {
+        private final String shardName;
+        private final ActorRef actor;
+        private final ActorPath actorPath;
+        private final Map<String, String> peerAddresses;
+
+        private ShardInformation(String shardName, ActorRef actor,
+            Map<String, String> peerAddresses) {
+            this.shardName = shardName;
+            this.actor = actor;
+            this.actorPath = actor.path();
+            this.peerAddresses = peerAddresses;
+        }
+
+        public String getShardName() {
+            return shardName;
+        }
+
+        public ActorRef getActor(){
+            return actor;
+        }
+
+        public ActorPath getActorPath() {
+            return actorPath;
+        }
+
+        public Map<String, String> getPeerAddresses() {
+            return peerAddresses;
+        }
+
+        public void updatePeerAddress(String peerId, String peerAddress){
+            LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
+            if(peerAddresses.containsKey(peerId)){
+                peerAddresses.put(peerId, peerAddress);
+
+                LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path());
+
+                actor
+                    .tell(new PeerAddressResolved(peerId, peerAddress),
+                        getSelf());
+
+            }
+        }
+    }
 }