Merge "Refactor snapshot code"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index bc4c825351cc72148f5276fc28d5a94e2e64f79d..55a86ceeea966aab6d67f2b3ba1692d772e74640 100644 (file)
@@ -43,19 +43,19 @@ import java.util.concurrent.CountDownLatch;
 import org.opendaylight.controller.cluster.DataPersistenceProvider;
 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
 import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
+import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
+import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
-import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
@@ -96,6 +96,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     // A data store could be of type config/operational
     private final String type;
 
+    private final String shardManagerIdentifierString;
+
     private final ClusterWrapper cluster;
 
     private final Configuration configuration;
@@ -122,6 +124,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         this.datastoreContext = datastoreContext;
         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
         this.type = datastoreContext.getDataStoreType();
+        this.shardManagerIdentifierString = ShardManagerIdentifier.builder().type(type).build().toString();
         this.shardDispatcherPath =
                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
@@ -158,8 +161,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     @Override
     public void handleCommand(Object message) throws Exception {
-        if (FindPrimary.SERIALIZABLE_CLASS.isInstance(message)) {
-            findPrimary(FindPrimary.fromSerializable(message));
+        if (message  instanceof FindPrimary) {
+            findPrimary((FindPrimary)message);
         } else if(message instanceof FindLocalShard){
             findLocalShard((FindLocalShard) message);
         } else if (message instanceof UpdateSchemaContext) {
@@ -203,13 +206,15 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         ShardInformation shardInfo = message.getShardInfo();
 
         LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
-                shardInfo.getShardId());
+                shardInfo.getShardName());
 
         shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
 
         if(!shardInfo.isShardInitialized()) {
-            message.getSender().tell(new ActorNotInitialized(), getSelf());
+            LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
+            message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
         } else {
+            LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
             message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
         }
     }
@@ -297,7 +302,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void markShardAsInitialized(String shardName) {
-        LOG.debug("Initializing shard [{}]", shardName);
+        LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
 
         ShardInformation shardInformation = localShards.get(shardName);
         if (shardInformation != null) {
@@ -367,6 +372,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
                 shardInformation.addOnShardInitialized(onShardInitialized);
 
+                LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
+
                 Cancellable timeoutSchedule = getContext().system().scheduler().scheduleOnce(
                         datastoreContext.getShardInitializationTimeout().duration(), getSelf(),
                         new ShardNotInitializedTimeout(shardInformation, onShardInitialized, sender),
@@ -375,8 +382,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 onShardInitialized.setTimeoutSchedule(timeoutSchedule);
 
             } else if (!shardInformation.isShardInitialized()) {
-                getSender().tell(new ActorNotInitialized(), getSelf());
+                LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(),
+                        shardInformation.getShardName());
+                getSender().tell(createNotInitializedException(shardInformation.shardId), getSelf());
             } else {
+                LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(),
+                        shardInformation.getShardName());
                 getSender().tell(createNoShardLeaderException(shardInformation.shardId), getSelf());
             }
 
@@ -392,13 +403,26 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
                 "recovering and a leader is being elected. Try again later.", shardId));
     }
 
+    private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+        return new NotInitializedException(String.format(
+                "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
+    }
+
     private void memberRemoved(ClusterEvent.MemberRemoved message) {
+        String memberName = message.member().roles().head();
+
+        LOG.debug("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
         memberNameToAddress.remove(message.member().roles().head());
     }
 
     private void memberUp(ClusterEvent.MemberUp message) {
         String memberName = message.member().roles().head();
 
+        LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
+                message.member().address());
+
         memberNameToAddress.put(memberName, message.member().address());
 
         for(ShardInformation info : localShards.values()){
@@ -461,6 +485,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
     }
 
+    @VisibleForTesting
+    protected ClusterWrapper getCluster() {
+        return cluster;
+    }
+
     @VisibleForTesting
     protected ActorRef newShardActor(final SchemaContext schemaContext, ShardInformation info) {
         return getContext().actorOf(Shard.props(info.getShardId(),
@@ -469,6 +498,8 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
     }
 
     private void findPrimary(FindPrimary message) {
+        LOG.debug("{}: In findPrimary: {}", persistenceId(), message);
+
         final String shardName = message.getShardName();
 
         // First see if the there is a local replica for the shard
@@ -477,10 +508,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             sendResponse(info, message.isWaitUntilReady(), true, new Supplier<Object>() {
                 @Override
                 public Object get() {
-                    Object found = new PrimaryFound(info.getSerializedLeaderActor()).toSerializable();
+                    Object found = new PrimaryFound(info.getSerializedLeaderActor());
 
                     if(LOG.isDebugEnabled()) {
-                        LOG.debug("{}: Found primary for {}: {}", shardName, found);
+                        LOG.debug("{}: Found primary for {}: {}", persistenceId(), shardName, found);
                     }
 
                     return found;
@@ -490,38 +521,35 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return;
         }
 
-        List<String> members = configuration.getMembersFromShardName(shardName);
+        for(Map.Entry<String, Address> entry: memberNameToAddress.entrySet()) {
+            if(!cluster.getCurrentMemberName().equals(entry.getKey())) {
+                String path = getShardManagerActorPathBuilder(entry.getValue()).toString();
 
-        if(cluster.getCurrentMemberName() != null) {
-            members.remove(cluster.getCurrentMemberName());
-        }
+                LOG.debug("{}: findPrimary for {} forwarding to remote ShardManager {}", persistenceId(),
+                        shardName, path);
 
-        /**
-         * FIXME: Instead of sending remote shard actor path back to sender,
-         * forward FindPrimary message to remote shard manager
-         */
-        // There is no way for us to figure out the primary (for now) so assume
-        // that one of the remote nodes is a primary
-        for(String memberName : members) {
-            Address address = memberNameToAddress.get(memberName);
-            if(address != null){
-                String path =
-                    getShardActorPath(shardName, memberName);
-                getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+                getContext().actorSelection(path).forward(message, getContext());
                 return;
             }
         }
-        getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+
+        LOG.debug("{}: No shard found for {}", persistenceId(), shardName);
+
+        getSender().tell(new PrimaryNotFoundException(
+                String.format("No primary shard found for %s.", shardName)), getSelf());
+    }
+
+    private StringBuilder getShardManagerActorPathBuilder(Address address) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(address.toString()).append("/user/").append(shardManagerIdentifierString);
+        return builder;
     }
 
     private String getShardActorPath(String shardName, String memberName) {
         Address address = memberNameToAddress.get(memberName);
         if(address != null) {
-            StringBuilder builder = new StringBuilder();
-            builder.append(address.toString())
-                .append("/user/")
-                .append(ShardManagerIdentifier.builder().type(type).build().toString())
-                .append("/")
+            StringBuilder builder = getShardManagerActorPathBuilder(address);
+            builder.append("/")
                 .append(getShardIdentifier(memberName, shardName));
             return builder.toString();
         }