Bug 2187: Address comments in https://git.opendaylight.org/gerrit/#/c/28596/
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 24d808cd91bf76839bcb9bdbb08e61e49a17db9b..724c8d2c03dc787549f511f60ac3e289ea763978 100644 (file)
@@ -8,17 +8,22 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
+import akka.actor.Address;
 import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
+import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.japi.Function;
 import akka.persistence.RecoveryCompleted;
 import akka.serialization.Serialization;
+import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
@@ -46,6 +51,7 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
@@ -58,6 +64,7 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
 import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
@@ -68,10 +75,15 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.messages.AddServer;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -197,6 +209,10 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             onSwitchShardBehavior((SwitchShardBehavior) message);
         } else if(message instanceof CreateShard) {
             onCreateShard((CreateShard)message);
+        } else if(message instanceof AddShardReplica){
+            onAddShardReplica((AddShardReplica)message);
+        } else if(message instanceof RemoveShardReplica){
+            onRemoveShardReplica((RemoveShardReplica)message);
         } else {
             unknownMessage(message);
         }
@@ -230,7 +246,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             }
 
             ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
-                    shardDatastoreContext, createShard.getShardPropsCreator());
+                    shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
             localShards.put(info.getShardName(), info);
 
             mBean.addLocalShard(shardId.toString());
@@ -458,11 +474,11 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         getSender().tell(messageSupplier.get(), getSelf());
     }
 
-    private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
+    private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
         return new NoShardLeaderException(null, shardId.toString());
     }
 
-    private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+    private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
         return new NotInitializedException(String.format(
                 "Found primary shard %s but it's not initialized yet. Please try again later", shardId));
     }
@@ -499,7 +515,13 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         LOG.debug("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
                 message.member().address());
 
-        peerAddressResolver.addPeerAddress(memberName, message.member().address());
+        addPeerAddress(memberName, message.member().address());
+
+        checkReady();
+    }
+
+    private void addPeerAddress(String memberName, Address address) {
+        peerAddressResolver.addPeerAddress(memberName, address);
 
         for(ShardInformation info : localShards.values()){
             String shardName = info.getShardName();
@@ -508,14 +530,14 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
             info.peerUp(memberName, peerId, getSelf());
         }
-
-        checkReady();
     }
 
     private void memberReachable(ClusterEvent.ReachableMember message) {
         String memberName = message.member().roles().head();
         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
 
+        addPeerAddress(memberName, message.member().address());
+
         markMemberAvailable(memberName);
     }
 
@@ -678,7 +700,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             Map<String, String> peerAddresses = getPeerAddresses(shardName);
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
-                    shardPropsCreator));
+                    shardPropsCreator, peerAddressResolver));
         }
 
         mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
@@ -733,13 +755,188 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return mBean;
     }
 
+    private DatastoreContext getInitShardDataStoreContext() {
+        return (DatastoreContext.newBuilderFrom(datastoreContext)
+                .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
+                .build());
+    }
+
+    private void checkLocalShardExists(final String shardName, final ActorRef sender) {
+        if (localShards.containsKey(shardName)) {
+            String msg = String.format("Local shard %s already exists", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+        }
+    }
+
+    private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
+        final String shardName = shardReplicaMsg.getShardName();
+
+        // verify the local shard replica is already available in the controller node
+        LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
+
+        checkLocalShardExists(shardName, getSender());
+
+        // verify the shard with the specified name is present in the cluster configuration
+        if (!(this.configuration.isShardConfigured(shardName))) {
+            String msg = String.format("No module configuration exists for shard %s", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+            return;
+        }
+
+        // Create the localShard
+        if (schemaContext == null) {
+            String msg = String.format(
+                  "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            return;
+        }
+
+        Map<String, String> peerAddresses = getPeerAddresses(shardName);
+        if (peerAddresses.isEmpty()) {
+            String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+            return;
+        }
+
+        Timeout findPrimaryTimeout = new Timeout(datastoreContext.getShardInitializationTimeout().duration().$times(2));
+
+        final ActorRef sender = getSender();
+        Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+                        String.format("Failed to find leader for shard %s", shardName), failure)),
+                        getSelf());
+                } else {
+                    if (!(response instanceof RemotePrimaryShardFound)) {
+                        String msg = String.format("Failed to find leader for shard %s: received response: %s",
+                                shardName, response);
+                        LOG.debug ("{}: {}", persistenceId(), msg);
+                        sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf());
+                        return;
+                    }
+
+                    RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
+                    addShard (shardName, message, sender);
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
+    private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
+        checkLocalShardExists(shardName, sender);
+
+        ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+        final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
+                          getPeerAddresses(shardName), getInitShardDataStoreContext(),
+                          new DefaultShardPropsCreator(), peerAddressResolver);
+        localShards.put(shardName, shardInfo);
+        shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+
+        //inform ShardLeader to add this shard as a replica by sending an AddServer message
+        LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+                response.getPrimaryPath(), shardId);
+
+        Timeout addServerTimeout = new Timeout(datastoreContext
+                       .getShardLeaderElectionTimeout().duration().$times(4));
+        Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+            new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object addServerResponse) {
+                if (failure != null) {
+                    LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+                            response.getPrimaryPath(), shardName, failure);
+
+                    // Remove the shard
+                    localShards.remove(shardName);
+                    if (shardInfo.getActor() != null) {
+                        shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+                    }
+
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+                        String.format("AddServer request to leader %s for shard %s failed",
+                            response.getPrimaryPath(), shardName), failure)), getSelf());
+                } else {
+                    AddServerReply reply = (AddServerReply)addServerResponse;
+                    onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).
+            getDispatcher(Dispatchers.DispatcherType.Client));
+        return;
+    }
+
+    private void onAddServerReply (String shardName, ShardInformation shardInfo,
+                                   AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+        LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+
+        if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+            LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+
+            // Make the local shard voting capable
+            shardInfo.setDatastoreContext(datastoreContext, getSelf());
+
+            mBean.addLocalShard(shardInfo.getShardId().toString());
+            sender.tell(new akka.actor.Status.Success(true), getSelf());
+        } else {
+            LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard",
+                    persistenceId(), shardName, replyMsg.getStatus());
+
+            //remove the local replica created
+            localShards.remove(shardName);
+            if (shardInfo.getActor() != null) {
+                shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+            }
+            switch (replyMsg.getStatus()) {
+                case TIMEOUT:
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+                        String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+                            leaderPath, shardName))), getSelf());
+                    break;
+                case NO_LEADER:
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+                        "There is no shard leader available for shard %s", shardName))), getSelf());
+                    break;
+                default :
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+                        "AddServer request to leader %s for shard %s failed with status %s",
+                        leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+            }
+        }
+    }
+
+    private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
+        String shardName = shardReplicaMsg.getShardName();
+
+        // verify the local shard replica is available in the controller node
+        if (!localShards.containsKey(shardName)) {
+            String msg = String.format("Local shard %s does not", shardName);
+            LOG.debug ("{}: {}", persistenceId(), msg);
+            getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+            return;
+        }
+        // call RemoveShard for the shardName
+        getSender().tell(new akka.actor.Status.Success(true), getSelf());
+        return;
+    }
+
     @VisibleForTesting
     protected static class ShardInformation {
         private final ShardIdentifier shardId;
         private final String shardName;
         private ActorRef actor;
         private ActorPath actorPath;
-        private final Map<String, String> peerAddresses;
+        private final Map<String, String> initialPeerAddresses;
         private Optional<DataTree> localShardDataTree;
         private boolean leaderAvailable = false;
 
@@ -753,21 +950,23 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private String leaderId;
         private short leaderVersion;
 
-        private final DatastoreContext datastoreContext;
+        private DatastoreContext datastoreContext;
         private final ShardPropsCreator shardPropsCreator;
+        private final ShardPeerAddressResolver addressResolver;
 
         private ShardInformation(String shardName, ShardIdentifier shardId,
-                Map<String, String> peerAddresses, DatastoreContext datastoreContext,
-                ShardPropsCreator shardPropsCreator) {
+                Map<String, String> initialPeerAddresses, DatastoreContext datastoreContext,
+                ShardPropsCreator shardPropsCreator, ShardPeerAddressResolver addressResolver) {
             this.shardName = shardName;
             this.shardId = shardId;
-            this.peerAddresses = peerAddresses;
+            this.initialPeerAddresses = initialPeerAddresses;
             this.datastoreContext = datastoreContext;
             this.shardPropsCreator = shardPropsCreator;
+            this.addressResolver = addressResolver;
         }
 
         Props newProps(SchemaContext schemaContext) {
-            return shardPropsCreator.newProps(shardId, peerAddresses, datastoreContext, schemaContext);
+            return shardPropsCreator.newProps(shardId, initialPeerAddresses, datastoreContext, schemaContext);
         }
 
         String getShardName() {
@@ -799,37 +998,29 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             return localShardDataTree;
         }
 
-        Map<String, String> getPeerAddresses() {
-            return peerAddresses;
-        }
-
         void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
-            LOG.info("updatePeerAddress for peer {} with address {}", peerId,
-                    peerAddress);
-            if(peerAddresses.containsKey(peerId)){
-                peerAddresses.put(peerId, peerAddress);
-
-                if(actor != null) {
-                    if(LOG.isDebugEnabled()) {
-                        LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
-                                peerId, peerAddress, actor.path());
-                    }
+            LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
 
-                    actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
+            if(actor != null) {
+                if(LOG.isDebugEnabled()) {
+                    LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
+                            peerId, peerAddress, actor.path());
                 }
 
-                notifyOnShardInitializedCallbacks();
+                actor.tell(new PeerAddressResolved(peerId, peerAddress), sender);
             }
+
+            notifyOnShardInitializedCallbacks();
         }
 
         void peerDown(String memberName, String peerId, ActorRef sender) {
-            if(peerAddresses.containsKey(peerId) && actor != null) {
+            if(actor != null) {
                 actor.tell(new PeerDown(memberName, peerId), sender);
             }
         }
 
         void peerUp(String memberName, String peerId, ActorRef sender) {
-            if(peerAddresses.containsKey(peerId) && actor != null) {
+            if(actor != null) {
                 actor.tell(new PeerUp(memberName, peerId), sender);
             }
         }
@@ -840,7 +1031,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
 
         boolean isShardReadyWithLeaderId() {
             return leaderAvailable && isShardReady() && !RaftState.IsolatedLeader.name().equals(role) &&
-                    (isLeader() || peerAddresses.get(leaderId) != null);
+                    (isLeader() || addressResolver.resolve(leaderId) != null);
         }
 
         boolean isShardInitialized() {
@@ -855,7 +1046,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
             if(isLeader()) {
                 return Serialization.serializedActorPath(getActor());
             } else {
-                return peerAddresses.get(leaderId);
+                return addressResolver.resolve(leaderId);
             }
         }
 
@@ -944,6 +1135,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         void setLeaderVersion(short leaderVersion) {
             this.leaderVersion = leaderVersion;
         }
+
+        void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+            this.datastoreContext = datastoreContext;
+            //notify the datastoreContextchange
+            LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}",
+                 this.shardName);
+            if (actor != null) {
+                actor.tell(this.datastoreContext, sender);
+            }
+        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {