Merge "bug 1888 - FRM Flow Listener registration fail"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index c9b7c07e9a3a44d1b0c64425122e53a5b3199543..a97c00f1d88227fb9d01c90ce38a80b8ccbb1e50 100644 (file)
@@ -17,8 +17,8 @@ import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
-
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
 
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
@@ -32,7 +32,8 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 
 import java.util.ArrayList;
@@ -49,7 +50,7 @@ import java.util.Map;
  * <li> Monitor the cluster members and store their addresses
  * <ul>
  */
-public class ShardManager extends AbstractUntypedActor {
+public class ShardManager extends AbstractUntypedActorWithMetering {
 
     // Stores a mapping between a member name and the address of the member
     // Member names look like "member-1", "member-2" etc and are as specified
@@ -71,38 +72,36 @@ public class ShardManager extends AbstractUntypedActor {
 
     private ShardManagerInfoMBean mBean;
 
-    private final ShardContext shardContext;
+    private final DatastoreContext datastoreContext;
 
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
      */
     private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
-            ShardContext shardContext) {
+            DatastoreContext datastoreContext) {
 
         this.type = Preconditions.checkNotNull(type, "type should not be null");
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
-        this.shardContext = shardContext;
+        this.datastoreContext = datastoreContext;
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        // Create all the local Shards and make them a child of the ShardManager
-        // TODO: This may need to be initiated when we first get the schema context
-        createLocalShards();
+        //createLocalShards(null);
     }
 
     public static Props props(final String type,
         final ClusterWrapper cluster,
         final Configuration configuration,
-        final ShardContext shardContext) {
+        final DatastoreContext datastoreContext) {
 
         Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
 
-        return Props.create(new ShardManagerCreator(type, cluster, configuration, shardContext));
+        return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
     }
 
     @Override
@@ -161,8 +160,14 @@ public class ShardManager extends AbstractUntypedActor {
      * @param message
      */
     private void updateSchemaContext(Object message) {
-        for(ShardInformation info : localShards.values()){
-            info.getActor().tell(message,getSelf());
+        SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+        if(localShards.size() == 0){
+            createLocalShards(schemaContext);
+        } else {
+            for (ShardInformation info : localShards.values()) {
+                info.getActor().tell(message, getSelf());
+            }
         }
     }
 
@@ -234,7 +239,7 @@ public class ShardManager extends AbstractUntypedActor {
      * runs
      *
      */
-    private void createLocalShards() {
+    private void createLocalShards(SchemaContext schemaContext) {
         String memberName = this.cluster.getCurrentMemberName();
         List<String> memberShardNames =
             this.configuration.getMemberShardNames(memberName);
@@ -244,15 +249,14 @@ public class ShardManager extends AbstractUntypedActor {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardId, peerAddresses, shardContext),
-                    shardId.toString());
+                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext).
+                    withMailbox(ActorContext.MAILBOX), shardId.toString());
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
         }
 
-        mBean = ShardManagerInfo
-            .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
-
+        mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+                    datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
     }
 
     /**
@@ -333,11 +337,11 @@ public class ShardManager extends AbstractUntypedActor {
                 peerAddress);
             if(peerAddresses.containsKey(peerId)){
                 peerAddresses.put(peerId, peerAddress);
-
-                LOG.debug(
-                    "Sending PeerAddressResolved for peer {} with address {} to {}",
-                    peerId, peerAddress, actor.path());
-
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug(
+                        "Sending PeerAddressResolved for peer {} with address {} to {}",
+                        peerId, peerAddress, actor.path());
+                }
                 actor
                     .tell(new PeerAddressResolved(peerId, peerAddress),
                         getSelf());
@@ -352,19 +356,19 @@ public class ShardManager extends AbstractUntypedActor {
         final String type;
         final ClusterWrapper cluster;
         final Configuration configuration;
-        final ShardContext shardContext;
+        final DatastoreContext datastoreContext;
 
         ShardManagerCreator(String type, ClusterWrapper cluster,
-                Configuration configuration, ShardContext shardContext) {
+                Configuration configuration, DatastoreContext datastoreContext) {
             this.type = type;
             this.cluster = cluster;
             this.configuration = configuration;
-            this.shardContext = shardContext;
+            this.datastoreContext = datastoreContext;
         }
 
         @Override
         public ShardManager create() throws Exception {
-            return new ShardManager(type, cluster, configuration, shardContext);
+            return new ShardManager(type, cluster, configuration, datastoreContext);
         }
     }
 }