BUG 2187 - Creating ShardReplica
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
index 2d793157744c221454c8a542c65ed926244efb0f..a878f6decbf0cbe51a8c34176fc2a58e522d74c0 100644 (file)
@@ -8,18 +8,22 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import static akka.pattern.Patterns.ask;
 import akka.actor.ActorPath;
 import akka.actor.ActorRef;
 import akka.actor.Address;
 import akka.actor.Cancellable;
 import akka.actor.OneForOneStrategy;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
+import akka.dispatch.OnComplete;
 import akka.japi.Creator;
 import akka.japi.Function;
 import akka.persistence.RecoveryCompleted;
 import akka.serialization.Serialization;
+import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Optional;
@@ -71,10 +75,15 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
 import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.messages.AddServer;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -746,10 +755,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         return mBean;
     }
 
+    private DatastoreContext getInitShardDataStoreContext() {
+        return (DatastoreContext.newBuilderFrom(datastoreContext)
+                .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
+                .build());
+    }
+
     private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
-        String shardName = shardReplicaMsg.getShardName();
+        final String shardName = shardReplicaMsg.getShardName();
 
         // verify the local shard replica is already available in the controller node
+        LOG.debug ("received AddShardReplica for shard {}", shardName);
         if (localShards.containsKey(shardName)) {
             LOG.debug ("Local shard {} already available in the controller node", shardName);
             getSender().tell(new akka.actor.Status.Failure(
@@ -767,10 +783,143 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
 
         // Create the localShard
-        getSender().tell(new akka.actor.Status.Success(true), getSelf());
+        if (schemaContext == null) {
+            LOG.debug ("schemaContext is not updated to create localShardActor");
+            getSender().tell(new akka.actor.Status.Failure(
+               new IllegalStateException(String.format(
+                  "schemaContext not available to create localShardActor for %s",
+                   shardName))), getSelf());
+            return;
+        }
+
+        Map<String, String> peerAddresses = getPeerAddresses(shardName);
+        if (peerAddresses.isEmpty()) {
+            LOG.debug ("Shard peers not available for replicating shard data from leader");
+            getSender().tell(new akka.actor.Status.Failure(
+                new IllegalStateException(String.format(
+                    "Cannot add replica for shard %s because no peer is available",
+                    shardName))), getSelf());
+            return;
+        }
+
+        Timeout findPrimaryTimeout = new Timeout(datastoreContext
+                       .getShardInitializationTimeout().duration().$times(2));
+
+        final ActorRef sender = getSender();
+        Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true),
+                                       findPrimaryTimeout);
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if (failure != null) {
+                    LOG.debug ("Failed to receive response for FindPrimary of shard {}",
+                                shardName, failure);
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+                        String.format("Failed to find leader for shard %s", shardName), failure)),
+                        getSelf());
+                } else {
+                    if (!(response instanceof RemotePrimaryShardFound)) {
+                        LOG.debug ("Shard leader not available for creating local shard replica {}",
+                            shardName);
+                        sender.tell(new akka.actor.Status.Failure(
+                            new IllegalStateException(String.format(
+                                "Invalid response type, %s, received from FindPrimary for shard %s",
+                                response.getClass().getName(), shardName))), getSelf());
+                        return;
+                    }
+                    RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
+                    addShard (shardName, message, sender);
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+    }
+
+    private void addShard(final String shardName, final RemotePrimaryShardFound response,
+                          final ActorRef sender) {
+        ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                                                     shardName);
+        String localShardAddress = peerAddressResolver.getShardActorAddress(shardName,
+            cluster.getCurrentMemberName());
+        final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
+                          getPeerAddresses(shardName), getInitShardDataStoreContext(),
+                          new DefaultShardPropsCreator(), peerAddressResolver);
+        localShards.put(shardName, shardInfo);
+        shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+
+        //inform ShardLeader to add this shard as a replica by sending an AddServer message
+        LOG.debug ("sending AddServer message to peer {} for shard {}",
+            response.getPrimaryPath(), shardId);
+
+        Timeout addServerTimeout = new Timeout(datastoreContext
+                       .getShardLeaderElectionTimeout().duration().$times(4));
+        Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+            new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+
+        futureObj.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object addServerResponse) {
+                if (failure != null) {
+                    LOG.debug ("AddServer request to {} for {} failed",
+                        response.getPrimaryPath(), shardName, failure);
+                    // Remove the shard
+                    localShards.remove(shardName);
+                    if (shardInfo.getActor() != null) {
+                        shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+                    }
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+                        String.format("AddServer request to leader %s for shard %s failed",
+                            response.getPrimaryPath(), shardName), failure)), getSelf());
+                } else {
+                    AddServerReply reply = (AddServerReply)addServerResponse;
+                    onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+                }
+            }
+        }, new Dispatchers(context().system().dispatchers()).
+            getDispatcher(Dispatchers.DispatcherType.Client));
         return;
     }
 
+    private void onAddServerReply (String shardName, ShardInformation shardInfo,
+                                   AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+        if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+            LOG.debug ("Leader shard successfully added the replica shard {}",
+                        shardName);
+            // Make the local shard voting capable
+            shardInfo.setDatastoreContext(datastoreContext, getSelf());
+            ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+                                                         shardName);
+            mBean.addLocalShard(shardId.toString());
+            sender.tell(new akka.actor.Status.Success(true), getSelf());
+        } else {
+            LOG.warn ("Cannot add shard replica {} status {}",
+                      shardName, replyMsg.getStatus());
+            LOG.debug ("removing the local shard replica for shard {}",
+                       shardName);
+            //remove the local replica created
+            localShards.remove(shardName);
+            if (shardInfo.getActor() != null) {
+                shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+            }
+            switch (replyMsg.getStatus()) {
+                //case ServerChangeStatus.TIMEOUT:
+                case TIMEOUT:
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+                        String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+                            leaderPath, shardName))), getSelf());
+                    break;
+                //case ServerChangeStatus.NO_LEADER:
+                case NO_LEADER:
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+                        "There is no shard leader available for shard %s", shardName))), getSelf());
+                    break;
+                default :
+                    sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+                        "AddServer request to leader %s for shard %s failed with status %s",
+                        leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+            }
+        }
+    }
+
     private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
         String shardName = shardReplicaMsg.getShardName();
         boolean deleteStatus = false;
@@ -808,7 +957,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         private String leaderId;
         private short leaderVersion;
 
-        private final DatastoreContext datastoreContext;
+        private DatastoreContext datastoreContext;
         private final ShardPropsCreator shardPropsCreator;
         private final ShardPeerAddressResolver addressResolver;
 
@@ -993,6 +1142,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         void setLeaderVersion(short leaderVersion) {
             this.leaderVersion = leaderVersion;
         }
+
+        void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+            this.datastoreContext = datastoreContext;
+            //notify the datastoreContextchange
+            LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}",
+                 this.shardName);
+            if (actor != null) {
+                actor.tell(this.datastoreContext, sender);
+            }
+        }
     }
 
     private static class ShardManagerCreator implements Creator<ShardManager> {