Merge "BUG-1953: fix bad values in netconf monitoring test"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 64c6821120f94f99a389c12700757a7b8c7266f5..5874c5296f0ebd8d1b5085abd3797f6c77907618 100644 (file)
@@ -15,9 +15,20 @@ import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.japi.Function;
+import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.RecoveryFailure;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
@@ -26,11 +37,19 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * The ShardManager has the following jobs,
@@ -41,7 +60,10 @@ import java.util.Map;
  * <li> Monitor the cluster members and store their addresses
  * <ul>
  */
-public class ShardManager extends AbstractUntypedActor {
+public class ShardManager extends AbstractUntypedPersistentActor {
+
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
 
     // Stores a mapping between a member name and the address of the member
     // Member names look like "member-1", "member-2" etc and are as specified
@@ -61,39 +83,44 @@ public class ShardManager extends AbstractUntypedActor {
 
     private final Configuration configuration;
 
+    private ShardManagerInfoMBean mBean;
+
+    private final DatastoreContext datastoreContext;
+
+    private final Collection<String> knownModules = new HashSet<>(128);
+
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
      */
-    private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+    private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+            DatastoreContext datastoreContext) {
 
         this.type = Preconditions.checkNotNull(type, "type should not be null");
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+        this.datastoreContext = datastoreContext;
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        // Create all the local Shards and make them a child of the ShardManager
-        // TODO: This may need to be initiated when we first get the schema context
-        createLocalShards();
+        //createLocalShards(null);
     }
 
     public static Props props(final String type,
         final ClusterWrapper cluster,
-        final Configuration configuration) {
-        return Props.create(new Creator<ShardManager>() {
+        final Configuration configuration,
+        final DatastoreContext datastoreContext) {
 
-            @Override
-            public ShardManager create() throws Exception {
-                return new ShardManager(type, cluster, configuration);
-            }
-        });
-    }
+        Preconditions.checkNotNull(type, "type should not be null");
+        Preconditions.checkNotNull(cluster, "cluster should not be null");
+        Preconditions.checkNotNull(configuration, "configuration should not be null");
 
+        return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
+    }
 
     @Override
-    public void handleReceive(Object message) throws Exception {
+    public void handleCommand(Object message) throws Exception {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
             findPrimary(
                 FindPrimary.fromSerializable(message));
@@ -108,11 +135,28 @@ public class ShardManager extends AbstractUntypedActor {
         } else if(message instanceof ClusterEvent.UnreachableMember) {
             ignoreMessage(message);
         } else{
-          throw new Exception ("Not recognized message received, message="+message);
+            unknownMessage(message);
         }
 
     }
 
+    @Override protected void handleRecover(Object message) throws Exception {
+
+        if(message instanceof SchemaContextModules){
+            SchemaContextModules msg = (SchemaContextModules) message;
+            knownModules.clear();
+            knownModules.addAll(msg.getModules());
+        } else if(message instanceof RecoveryFailure){
+            RecoveryFailure failure = (RecoveryFailure) message;
+            LOG.error(failure.cause(), "Recovery failed");
+        } else if(message instanceof RecoveryCompleted){
+            LOG.info("Recovery complete : {}", persistenceId());
+
+            // Delete all the messages from the akka journal except the last one
+            deleteMessages(lastSequenceNr() - 1);
+        }
+    }
+
     private void findLocalShard(FindLocalShard message) {
         ShardInformation shardInformation =
             localShards.get(message.getShardName());
@@ -122,11 +166,8 @@ public class ShardManager extends AbstractUntypedActor {
             return;
         }
 
-        getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
-    }
-
-    private void ignoreMessage(Object message){
-        LOG.debug("Unhandled message : " + message);
+        getSender().tell(new LocalShardNotFound(message.getShardName()),
+            getSelf());
     }
 
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
@@ -140,7 +181,7 @@ public class ShardManager extends AbstractUntypedActor {
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
-            info.updatePeerAddress(getShardActorName(memberName, shardName),
+            info.updatePeerAddress(getShardIdentifier(memberName, shardName),
                 getShardActorPath(shardName, memberName));
         }
     }
@@ -150,18 +191,47 @@ public class ShardManager extends AbstractUntypedActor {
      *
      * @param message
      */
-    private void updateSchemaContext(Object message) {
-        for(ShardInformation info : localShards.values()){
-            info.getActor().tell(message,getSelf());
+    private void updateSchemaContext(final Object message) {
+        final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+        Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
+        Set<String> newModules = new HashSet<>(128);
+
+        for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
+            String s = moduleIdentifier.getNamespace().toString();
+            newModules.add(s);
         }
+
+        if(newModules.containsAll(knownModules)) {
+
+            LOG.info("New SchemaContext has a super set of current knownModules - persisting info");
+
+            knownModules.clear();
+            knownModules.addAll(newModules);
+
+            persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
+
+                @Override public void apply(SchemaContextModules param) throws Exception {
+                    LOG.info("Sending new SchemaContext to Shards");
+                    if (localShards.size() == 0) {
+                        createLocalShards(schemaContext);
+                    } else {
+                        for (ShardInformation info : localShards.values()) {
+                            info.getActor().tell(message, getSelf());
+                        }
+                    }
+                }
+
+            });
+        } else {
+            LOG.info("Rejecting schema context update because it is not a super set of previously known modules");
+        }
+
     }
 
     private void findPrimary(FindPrimary message) {
         String shardName = message.getShardName();
 
-        List<String> members =
-            configuration.getMembersFromShardName(shardName);
-
         // First see if the there is a local replica for the shard
         ShardInformation info = localShards.get(shardName);
         if(info != null) {
@@ -175,6 +245,9 @@ public class ShardManager extends AbstractUntypedActor {
             }
         }
 
+        List<String> members =
+            configuration.getMembersFromShardName(shardName);
+
         if(cluster.getCurrentMemberName() != null) {
             members.remove(cluster.getCurrentMemberName());
         }
@@ -196,9 +269,13 @@ public class ShardManager extends AbstractUntypedActor {
     private String getShardActorPath(String shardName, String memberName) {
         Address address = memberNameToAddress.get(memberName);
         if(address != null) {
-            return address.toString() + "/user/shardmanager-" + this.type + "/"
-                + getShardActorName(
-                memberName, shardName);
+            StringBuilder builder = new StringBuilder();
+            builder.append(address.toString())
+                .append("/user/")
+                .append(ShardManagerIdentifier.builder().type(type).build().toString())
+                .append("/")
+                .append(getShardIdentifier(memberName, shardName));
+            return builder.toString();
         }
         return null;
     }
@@ -211,8 +288,8 @@ public class ShardManager extends AbstractUntypedActor {
      * @param shardName
      * @return
      */
-    private String getShardActorName(String memberName, String shardName){
-        return memberName + "-shard-" + shardName + "-" + this.type;
+    private ShardIdentifier getShardIdentifier(String memberName, String shardName){
+        return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
     }
 
     /**
@@ -220,20 +297,24 @@ public class ShardManager extends AbstractUntypedActor {
      * runs
      *
      */
-    private void createLocalShards() {
+    private void createLocalShards(SchemaContext schemaContext) {
         String memberName = this.cluster.getCurrentMemberName();
         List<String> memberShardNames =
             this.configuration.getMemberShardNames(memberName);
 
+        List<String> localShardActorNames = new ArrayList<>();
         for(String shardName : memberShardNames){
-            String shardActorName = getShardActorName(memberName, shardName);
-            Map<String, String> peerAddresses = getPeerAddresses(shardName);
+            ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
+            Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardActorName, peerAddresses),
-                    shardActorName);
+                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext).
+                    withMailbox(ActorContext.MAILBOX), shardId.toString());
+            localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
         }
 
+        mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+                    datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
     }
 
     /**
@@ -242,9 +323,9 @@ public class ShardManager extends AbstractUntypedActor {
      * @param shardName
      * @return
      */
-    private Map<String, String> getPeerAddresses(String shardName){
+    private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
 
-        Map<String, String> peerAddresses = new HashMap<>();
+        Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
 
         List<String> members =
             this.configuration.getMembersFromShardName(shardName);
@@ -253,22 +334,29 @@ public class ShardManager extends AbstractUntypedActor {
 
         for(String memberName : members){
             if(!currentMemberName.equals(memberName)){
-                String shardActorName = getShardActorName(memberName, shardName);
+                ShardIdentifier shardId = getShardIdentifier(memberName,
+                    shardName);
                 String path =
                     getShardActorPath(shardName, currentMemberName);
-                peerAddresses.put(shardActorName, path);
+                peerAddresses.put(shardId, path);
             }
         }
         return peerAddresses;
     }
 
-
     @Override
     public SupervisorStrategy supervisorStrategy() {
+
         return new OneForOneStrategy(10, Duration.create("1 minute"),
             new Function<Throwable, SupervisorStrategy.Directive>() {
                 @Override
                 public SupervisorStrategy.Directive apply(Throwable t) {
+                    StringBuilder sb = new StringBuilder();
+                    for(StackTraceElement element : t.getStackTrace()) {
+                       sb.append("\n\tat ")
+                         .append(element.toString());
+                    }
+                    LOG.warning("Supervisor Strategy of resume applied {}",sb.toString());
                     return SupervisorStrategy.resume();
                 }
             }
@@ -276,14 +364,22 @@ public class ShardManager extends AbstractUntypedActor {
 
     }
 
+    @Override public String persistenceId() {
+        return "shard-manager-" + type;
+    }
+
+    @VisibleForTesting public Collection<String> getKnownModules() {
+        return knownModules;
+    }
+
     private class ShardInformation {
         private final String shardName;
         private final ActorRef actor;
         private final ActorPath actorPath;
-        private final Map<String, String> peerAddresses;
+        private final Map<ShardIdentifier, String> peerAddresses;
 
         private ShardInformation(String shardName, ActorRef actor,
-            Map<String, String> peerAddresses) {
+            Map<ShardIdentifier, String> peerAddresses) {
             this.shardName = shardName;
             this.actor = actor;
             this.actorPath = actor.path();
@@ -302,17 +398,16 @@ public class ShardManager extends AbstractUntypedActor {
             return actorPath;
         }
 
-        public Map<String, String> getPeerAddresses() {
-            return peerAddresses;
-        }
-
-        public void updatePeerAddress(String peerId, String peerAddress){
-            LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
+        public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
+            LOG.info("updatePeerAddress for peer {} with address {}", peerId,
+                peerAddress);
             if(peerAddresses.containsKey(peerId)){
                 peerAddresses.put(peerId, peerAddress);
-
-                LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path());
-
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug(
+                        "Sending PeerAddressResolved for peer {} with address {} to {}",
+                        peerId, peerAddress, actor.path());
+                }
                 actor
                     .tell(new PeerAddressResolved(peerId, peerAddress),
                         getSelf());
@@ -320,4 +415,41 @@ public class ShardManager extends AbstractUntypedActor {
             }
         }
     }
+
+    private static class ShardManagerCreator implements Creator<ShardManager> {
+        private static final long serialVersionUID = 1L;
+
+        final String type;
+        final ClusterWrapper cluster;
+        final Configuration configuration;
+        final DatastoreContext datastoreContext;
+
+        ShardManagerCreator(String type, ClusterWrapper cluster,
+                Configuration configuration, DatastoreContext datastoreContext) {
+            this.type = type;
+            this.cluster = cluster;
+            this.configuration = configuration;
+            this.datastoreContext = datastoreContext;
+        }
+
+        @Override
+        public ShardManager create() throws Exception {
+            return new ShardManager(type, cluster, configuration, datastoreContext);
+        }
+    }
+
+    static class SchemaContextModules implements Serializable {
+        private final Set<String> modules;
+
+        SchemaContextModules(Set<String> modules){
+            this.modules = modules;
+        }
+
+        public Set<String> getModules() {
+            return modules;
+        }
+    }
 }
+
+
+