Modify the FindPrimary implementation so that it works correctly with a configuration
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 8fd13bd45a2b11cdd25dd2e6d40736bd549b0c1d..3e0c97f6afb8e33d1c9b580f4a96111d2a3b6884 100644 (file)
@@ -67,22 +67,28 @@ public class ShardManager extends AbstractUntypedActor {
     private final Map<String, ActorPath> localShards = new HashMap<>();
 
 
+    private final String type;
+
     private final ClusterWrapper cluster;
 
+    private final Configuration configuration;
+
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
      */
     private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+        this.type = type;
         this.cluster = cluster;
+        this.configuration = configuration;
         String memberName = cluster.getCurrentMemberName();
         List<String> memberShardNames =
             configuration.getMemberShardNames(memberName);
 
         for(String shardName : memberShardNames){
+            String shardActorName = getShardActorName(memberName, shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(memberName + "-shard-" + shardName + "-" + type),
-                    memberName + "-shard-" + shardName + "-" + type);
+                .actorOf(Shard.props(shardActorName), shardActorName);
             ActorPath path = actor.path();
             localShards.put(shardName, path);
         }
@@ -106,16 +112,37 @@ public class ShardManager extends AbstractUntypedActor {
             FindPrimary msg = ((FindPrimary) message);
             String shardName = msg.getShardName();
 
-            if (Shard.DEFAULT_NAME.equals(shardName)) {
-                ActorPath defaultShardPath = localShards.get(shardName);
-                if(defaultShardPath == null){
-                    throw new IllegalStateException("local default shard not found");
+            List<String> members =
+                configuration.getMembersFromShardName(shardName);
+
+            for(String memberName : members) {
+                if (memberName.equals(cluster.getCurrentMemberName())) {
+                    // This is a local shard
+                    ActorPath shardPath = localShards.get(shardName);
+                    // FIXME: This check may be redundant
+                    if (shardPath == null) {
+                        getSender()
+                            .tell(new PrimaryNotFound(shardName), getSelf());
+                        return;
+                    }
+                    getSender().tell(new PrimaryFound(shardPath.toString()),
+                        getSelf());
+                    return;
+                } else {
+                    Address address = memberNameToAddress.get(shardName);
+                    if(address != null){
+                        String path =
+                            address.toString() + "/user/" + getShardActorName(
+                                memberName, shardName);
+                        getSender().tell(new PrimaryFound(path), getSelf());
+                    }
+
+
                 }
-                getSender().tell(new PrimaryFound(defaultShardPath.toString()),
-                    getSelf());
-            } else {
-                getSender().tell(new PrimaryNotFound(shardName), getSelf());
             }
+
+            getSender().tell(new PrimaryNotFound(shardName), getSelf());
+
         } else if (message instanceof UpdateSchemaContext) {
             for(ActorPath path : localShards.values()){
                 getContext().system().actorSelection(path)
@@ -125,5 +152,9 @@ public class ShardManager extends AbstractUntypedActor {
         }
     }
 
+    private String getShardActorName(String memberName, String shardName){
+        return memberName + "-shard-" + shardName + "-" + this.type;
+    }
+
 
 }