Bug 2038: Ensure only one concurrent 3-phase commit in Shard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index e51d49bff2aff8b6380081e0e772765d172246b0..e68628dbf5c1fc992afb30ae986fff8ed8f6eef1 100644 (file)
@@ -15,15 +15,22 @@ import akka.actor.OneForOneStrategy;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.japi.Function;
-
+import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.RecoveryFailure;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
@@ -32,14 +39,18 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.duration.Duration;
 
+import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * The ShardManager has the following jobs,
@@ -50,7 +61,10 @@ import java.util.Map;
  * <li> Monitor the cluster members and store their addresses
  * <ul>
  */
-public class ShardManager extends AbstractUntypedActor {
+public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
+
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
 
     // Stores a mapping between a member name and the address of the member
     // Member names look like "member-1", "member-2" etc and are as specified
@@ -74,6 +88,8 @@ public class ShardManager extends AbstractUntypedActor {
 
     private final DatastoreContext datastoreContext;
 
+    private final Collection<String> knownModules = new HashSet<>(128);
+
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
@@ -89,9 +105,7 @@ public class ShardManager extends AbstractUntypedActor {
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
 
-        // Create all the local Shards and make them a child of the ShardManager
-        // TODO: This may need to be initiated when we first get the schema context
-        createLocalShards();
+        //createLocalShards(null);
     }
 
     public static Props props(final String type,
@@ -107,7 +121,7 @@ public class ShardManager extends AbstractUntypedActor {
     }
 
     @Override
-    public void handleReceive(Object message) throws Exception {
+    public void handleCommand(Object message) throws Exception {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
             findPrimary(
                 FindPrimary.fromSerializable(message));
@@ -115,6 +129,8 @@ public class ShardManager extends AbstractUntypedActor {
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
             updateSchemaContext(message);
+        } else if(message instanceof ActorInitialized) {
+            onActorInitialized(message);
         } else if (message instanceof ClusterEvent.MemberUp){
             memberUp((ClusterEvent.MemberUp) message);
         } else if(message instanceof ClusterEvent.MemberRemoved) {
@@ -127,17 +143,66 @@ public class ShardManager extends AbstractUntypedActor {
 
     }
 
+    private void onActorInitialized(Object message) {
+        final ActorRef sender = getSender();
+
+        if (sender == null) {
+            return; //why is a non-actor sending this message? Just ignore.
+        }
+
+        String actorName = sender.path().name();
+        //find shard name from actor name; actor name is stringified shardId
+        ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
+
+        if (shardId.getShardName() == null) {
+            return;
+        }
+        markShardAsInitialized(shardId.getShardName());
+    }
+
+    @VisibleForTesting protected void markShardAsInitialized(String shardName) {
+        LOG.debug("Initializing shard [{}]", shardName);
+        ShardInformation shardInformation = localShards.get(shardName);
+        if (shardInformation != null) {
+            shardInformation.setShardInitialized(true);
+        }
+    }
+
+    @Override protected void handleRecover(Object message) throws Exception {
+
+        if(message instanceof SchemaContextModules){
+            SchemaContextModules msg = (SchemaContextModules) message;
+            knownModules.clear();
+            knownModules.addAll(msg.getModules());
+        } else if(message instanceof RecoveryFailure){
+            RecoveryFailure failure = (RecoveryFailure) message;
+            LOG.error(failure.cause(), "Recovery failed");
+        } else if(message instanceof RecoveryCompleted){
+            LOG.info("Recovery complete : {}", persistenceId());
+
+            // Delete all the messages from the akka journal except the last one
+            deleteMessages(lastSequenceNr() - 1);
+        }
+    }
+
     private void findLocalShard(FindLocalShard message) {
-        ShardInformation shardInformation =
-            localShards.get(message.getShardName());
+        ShardInformation shardInformation = localShards.get(message.getShardName());
+
+        if(shardInformation == null){
+            getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+            return;
+        }
 
-        if(shardInformation != null){
-            getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
+        sendResponse(shardInformation, new LocalShardFound(shardInformation.getActor()));
+    }
+
+    private void sendResponse(ShardInformation shardInformation, Object message) {
+        if (!shardInformation.isShardInitialized()) {
+            getSender().tell(new ActorNotInitialized(), getSelf());
             return;
         }
 
-        getSender().tell(new LocalShardNotFound(message.getShardName()),
-            getSelf());
+        getSender().tell(message, getSelf());
     }
 
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
@@ -147,7 +212,7 @@ public class ShardManager extends AbstractUntypedActor {
     private void memberUp(ClusterEvent.MemberUp message) {
         String memberName = message.member().roles().head();
 
-        memberNameToAddress.put(memberName , message.member().address());
+        memberNameToAddress.put(memberName, message.member().address());
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
@@ -161,35 +226,66 @@ public class ShardManager extends AbstractUntypedActor {
      *
      * @param message
      */
-    private void updateSchemaContext(Object message) {
-        for(ShardInformation info : localShards.values()){
-            info.getActor().tell(message,getSelf());
+    private void updateSchemaContext(final Object message) {
+        final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
+
+        Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
+        Set<String> newModules = new HashSet<>(128);
+
+        for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
+            String s = moduleIdentifier.getNamespace().toString();
+            newModules.add(s);
         }
+
+        if(newModules.containsAll(knownModules)) {
+
+            LOG.info("New SchemaContext has a super set of current knownModules - persisting info");
+
+            knownModules.clear();
+            knownModules.addAll(newModules);
+
+            persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
+
+                @Override public void apply(SchemaContextModules param) throws Exception {
+                    LOG.info("Sending new SchemaContext to Shards");
+                    if (localShards.size() == 0) {
+                        createLocalShards(schemaContext);
+                    } else {
+                        for (ShardInformation info : localShards.values()) {
+                            info.getActor().tell(message, getSelf());
+                        }
+                    }
+                }
+
+            });
+        } else {
+            LOG.info("Rejecting schema context update because it is not a super set of previously known modules");
+        }
+
     }
 
     private void findPrimary(FindPrimary message) {
+        final ActorRef sender = getSender();
         String shardName = message.getShardName();
 
         // First see if the there is a local replica for the shard
         ShardInformation info = localShards.get(shardName);
-        if(info != null) {
+        if (info != null) {
             ActorPath shardPath = info.getActorPath();
-            if (shardPath != null) {
-                getSender()
-                    .tell(
-                        new PrimaryFound(shardPath.toString()).toSerializable(),
-                        getSelf());
-                return;
-            }
+            sendResponse(info, new PrimaryFound(shardPath.toString()).toSerializable());
+            return;
         }
 
-        List<String> members =
-            configuration.getMembersFromShardName(shardName);
+        List<String> members = configuration.getMembersFromShardName(shardName);
 
         if(cluster.getCurrentMemberName() != null) {
             members.remove(cluster.getCurrentMemberName());
         }
 
+        /**
+         * FIXME: Instead of sending remote shard actor path back to sender,
+         * forward FindPrimary message to remote shard manager
+         */
         // There is no way for us to figure out the primary (for now) so assume
         // that one of the remote nodes is a primary
         for(String memberName : members) {
@@ -235,7 +331,7 @@ public class ShardManager extends AbstractUntypedActor {
      * runs
      *
      */
-    private void createLocalShards() {
+    private void createLocalShards(SchemaContext schemaContext) {
         String memberName = this.cluster.getCurrentMemberName();
         List<String> memberShardNames =
             this.configuration.getMemberShardNames(memberName);
@@ -245,16 +341,14 @@ public class ShardManager extends AbstractUntypedActor {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext).
-                    withMailbox(ActorContext.MAILBOX), shardId.toString());
-
+                .actorOf(Shard.props(shardId, peerAddresses, datastoreContext, schemaContext),
+                    shardId.toString());
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
         }
 
-        mBean = ShardManagerInfo
-            .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
-
+        mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
+                    datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
     }
 
     /**
@@ -304,11 +398,20 @@ public class ShardManager extends AbstractUntypedActor {
 
     }
 
+    @Override public String persistenceId() {
+        return "shard-manager-" + type;
+    }
+
+    @VisibleForTesting public Collection<String> getKnownModules() {
+        return knownModules;
+    }
+
     private class ShardInformation {
         private final String shardName;
         private final ActorRef actor;
         private final ActorPath actorPath;
         private final Map<ShardIdentifier, String> peerAddresses;
+        private boolean shardInitialized = false; //flag that determines if the actor is ready for business
 
         private ShardInformation(String shardName, ActorRef actor,
             Map<ShardIdentifier, String> peerAddresses) {
@@ -335,17 +438,25 @@ public class ShardManager extends AbstractUntypedActor {
                 peerAddress);
             if(peerAddresses.containsKey(peerId)){
                 peerAddresses.put(peerId, peerAddress);
-
-                LOG.debug(
-                    "Sending PeerAddressResolved for peer {} with address {} to {}",
-                    peerId, peerAddress, actor.path());
-
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug(
+                        "Sending PeerAddressResolved for peer {} with address {} to {}",
+                        peerId, peerAddress, actor.path());
+                }
                 actor
                     .tell(new PeerAddressResolved(peerId, peerAddress),
                         getSelf());
 
             }
         }
+
+        public boolean isShardInitialized() {
+            return shardInitialized;
+        }
+
+        public void setShardInitialized(boolean shardInitialized) {
+            this.shardInitialized = shardInitialized;
+        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {
@@ -369,6 +480,18 @@ public class ShardManager extends AbstractUntypedActor {
             return new ShardManager(type, cluster, configuration, datastoreContext);
         }
     }
+
+    static class SchemaContextModules implements Serializable {
+        private final Set<String> modules;
+
+        SchemaContextModules(Set<String> modules){
+            this.modules = modules;
+        }
+
+        public Set<String> getModules() {
+            return modules;
+        }
+    }
 }