Merge changes I442a0ee9,I11825b90
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 63266d6308287d2e816724f3f73b192b0d120bce..81236521d649a48902de103c52888488559112ca 100644 (file)
@@ -8,12 +8,17 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
 import akka.actor.Address;
-import akka.actor.UntypedActor;
+import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 
 import java.util.HashMap;
 import java.util.List;
@@ -21,41 +26,105 @@ import java.util.Map;
 
 /**
  * The ShardManager has the following jobs,
- *
- *  - Create all the local shard replicas that belong on this cluster member
- *  - Find the primary replica for any given shard
- *  - Engage in shard replica elections which decide which replica should be the primary
- *
- * Creation of Shard replicas
- * ==========================
- *  When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
- *  belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
- *
- * Replica Elections
- * =================
- *  The Shard Manager uses multiple cues to initiate election.
- *      - When a member of the cluster dies
- *      - When a local shard replica dies
- *      - When a local shard replica comes alive
+ * <p>
+ * <li> Create all the local shard replicas that belong on this cluster member
+ * <li> Find the primary replica for any given shard
+ * <li> Engage in shard replica elections which decide which replica should be the primary
+ * </p>
+ * <p/>
+ * <h3>>Creation of Shard replicas</h3
+ * <p>
+ * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
+ * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
+ * </p>
+ * <p/>
+ * <h3> Replica Elections </h3>
+ * <p/>
+ * <p>
+ * The Shard Manager uses multiple cues to initiate election.
+ * <li> When a member of the cluster dies
+ * <li> When a local shard replica dies
+ * <li> When a local shard replica comes alive
+ * </p>
  */
-public class ShardManager extends UntypedActor {
+public class ShardManager extends AbstractUntypedActor {
 
     // Stores a mapping between a shard name and the address of the current primary
-    private final Map<String, Address> shardNameToPrimaryAddress = new HashMap<>();
+    private final Map<String, Address> shardNameToPrimaryAddress =
+        new HashMap<>();
 
     // Stores a mapping between a member name and the address of the member
     private final Map<String, Address> memberNameToAddress = new HashMap<>();
 
     // Stores a mapping between the shard name and all the members on which a replica of that shard are available
-    private final Map<String, List<String>> shardNameToMembers = new HashMap<>();
+    private final Map<String, List<String>> shardNameToMembers =
+        new HashMap<>();
+
+    private final LoggingAdapter log =
+        Logging.getLogger(getContext().system(), this);
+
+
+    private final Map<String, ActorPath> localShards = new HashMap<>();
+
+
+    private final ClusterWrapper cluster;
+
+    /**
+     * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
+     *             configuration or operational
+     */
+    private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+        this.cluster = cluster;
+        String memberName = cluster.getCurrentMemberName();
+        List<String> memberShardNames =
+            configuration.getMemberShardNames(memberName);
 
-    LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+        for(String shardName : memberShardNames){
+            ActorRef actor = getContext()
+                .actorOf(Shard.props(memberName + "-shard-" + shardName + "-" + type),
+                    memberName + "-shard-" + shardName + "-" + type);
+            ActorPath path = actor.path();
+            localShards.put(shardName, path);
+        }
+    }
+
+    public static Props props(final String type,
+        final ClusterWrapper cluster,
+        final Configuration configuration) {
+        return Props.create(new Creator<ShardManager>() {
+
+            @Override
+            public ShardManager create() throws Exception {
+                return new ShardManager(type, cluster, configuration);
+            }
+        });
+    }
 
     @Override
-    public void onReceive(Object message) throws Exception {
-        if(message instanceof FindPrimary ){
+    public void handleReceive(Object message) throws Exception {
+        if (message instanceof FindPrimary) {
             FindPrimary msg = ((FindPrimary) message);
-            getSender().tell(new PrimaryNotFound(msg.getShardName()), getSelf());
+            String shardName = msg.getShardName();
+
+
+            if (Shard.DEFAULT_NAME.equals(shardName)) {
+                ActorPath defaultShardPath = localShards.get(shardName);
+                if(defaultShardPath == null){
+                    throw new IllegalStateException("local default shard not found");
+                }
+                getSender().tell(new PrimaryFound(defaultShardPath.toString()),
+                    getSelf());
+            } else {
+                getSender().tell(new PrimaryNotFound(shardName), getSelf());
+            }
+        } else if (message instanceof UpdateSchemaContext) {
+            for(ActorPath path : localShards.values()){
+                getContext().system().actorSelection(path)
+                    .forward(message,
+                        getContext());
+            }
         }
     }
+
+
 }