Initial support for multiple-shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index f55774f09104f5fcf3d0623583dab81fb74ab9d8..81236521d649a48902de103c52888488559112ca 100644 (file)
@@ -49,54 +49,82 @@ import java.util.Map;
  */
 public class ShardManager extends AbstractUntypedActor {
 
-  // Stores a mapping between a shard name and the address of the current primary
-  private final Map<String, Address> shardNameToPrimaryAddress = new HashMap<>();
-
-  // Stores a mapping between a member name and the address of the member
-  private final Map<String, Address> memberNameToAddress = new HashMap<>();
-
-  // Stores a mapping between the shard name and all the members on which a replica of that shard are available
-  private final Map<String, List<String>> shardNameToMembers = new HashMap<>();
-
-  private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
-
-  private final ActorPath defaultShardPath;
-
-  /**
-   *
-   * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
-   *             configuration or operational
-   */
-  private ShardManager(String type){
-    ActorRef actor = getContext().actorOf(Shard.props("shard-" + Shard.DEFAULT_NAME + "-" + type), "shard-" + Shard.DEFAULT_NAME + "-" + type);
-    defaultShardPath = actor.path();
-  }
-
-  public static Props props(final String type){
-    return Props.create(new Creator<ShardManager>(){
-
-      @Override
-      public ShardManager create() throws Exception {
-        return new ShardManager(type);
-      }
-    });
-  }
-
-  @Override
-  public void handleReceive(Object message) throws Exception {
-    if (message instanceof FindPrimary) {
-      FindPrimary msg = ((FindPrimary) message);
-      String shardName = msg.getShardName();
-      if(Shard.DEFAULT_NAME.equals(shardName)){
-        getSender().tell(new PrimaryFound(defaultShardPath.toString()), getSelf());
-      } else {
-        getSender().tell(new PrimaryNotFound(shardName), getSelf());
-      }
-    } else if(message instanceof UpdateSchemaContext){
-        // FIXME : Notify all local shards of a schemaContext change
-        getContext().system().actorSelection(defaultShardPath).forward(message, getContext());
+    // Stores a mapping between a shard name and the address of the current primary
+    private final Map<String, Address> shardNameToPrimaryAddress =
+        new HashMap<>();
+
+    // Stores a mapping between a member name and the address of the member
+    private final Map<String, Address> memberNameToAddress = new HashMap<>();
+
+    // Stores a mapping between the shard name and all the members on which a replica of that shard are available
+    private final Map<String, List<String>> shardNameToMembers =
+        new HashMap<>();
+
+    private final LoggingAdapter log =
+        Logging.getLogger(getContext().system(), this);
+
+
+    private final Map<String, ActorPath> localShards = new HashMap<>();
+
+
+    private final ClusterWrapper cluster;
+
+    /**
+     * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
+     *             configuration or operational
+     */
+    private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+        this.cluster = cluster;
+        String memberName = cluster.getCurrentMemberName();
+        List<String> memberShardNames =
+            configuration.getMemberShardNames(memberName);
+
+        for(String shardName : memberShardNames){
+            ActorRef actor = getContext()
+                .actorOf(Shard.props(memberName + "-shard-" + shardName + "-" + type),
+                    memberName + "-shard-" + shardName + "-" + type);
+            ActorPath path = actor.path();
+            localShards.put(shardName, path);
+        }
+    }
+
+    public static Props props(final String type,
+        final ClusterWrapper cluster,
+        final Configuration configuration) {
+        return Props.create(new Creator<ShardManager>() {
+
+            @Override
+            public ShardManager create() throws Exception {
+                return new ShardManager(type, cluster, configuration);
+            }
+        });
+    }
+
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if (message instanceof FindPrimary) {
+            FindPrimary msg = ((FindPrimary) message);
+            String shardName = msg.getShardName();
+
+
+            if (Shard.DEFAULT_NAME.equals(shardName)) {
+                ActorPath defaultShardPath = localShards.get(shardName);
+                if(defaultShardPath == null){
+                    throw new IllegalStateException("local default shard not found");
+                }
+                getSender().tell(new PrimaryFound(defaultShardPath.toString()),
+                    getSelf());
+            } else {
+                getSender().tell(new PrimaryNotFound(shardName), getSelf());
+            }
+        } else if (message instanceof UpdateSchemaContext) {
+            for(ActorPath path : localShards.values()){
+                getContext().system().actorSelection(path)
+                    .forward(message,
+                        getContext());
+            }
+        }
     }
-  }
 
 
 }