package org.opendaylight.controller.cluster.datastore;
+import static akka.pattern.Patterns.ask;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
+import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.japi.Function;
import akka.persistence.RecoveryCompleted;
import akka.serialization.Serialization;
+import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.messages.AddServer;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
return mBean;
}
+ private DatastoreContext getInitShardDataStoreContext() {
+ return (DatastoreContext.newBuilderFrom(datastoreContext)
+ .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName())
+ .build());
+ }
+
private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
- String shardName = shardReplicaMsg.getShardName();
+ final String shardName = shardReplicaMsg.getShardName();
// verify the local shard replica is already available in the controller node
+ LOG.debug ("received AddShardReplica for shard {}", shardName);
if (localShards.containsKey(shardName)) {
LOG.debug ("Local shard {} already available in the controller node", shardName);
getSender().tell(new akka.actor.Status.Failure(
}
// Create the localShard
- getSender().tell(new akka.actor.Status.Success(true), getSelf());
+ if (schemaContext == null) {
+ LOG.debug ("schemaContext is not updated to create localShardActor");
+ getSender().tell(new akka.actor.Status.Failure(
+ new IllegalStateException(String.format(
+ "schemaContext not available to create localShardActor for %s",
+ shardName))), getSelf());
+ return;
+ }
+
+ Map<String, String> peerAddresses = getPeerAddresses(shardName);
+ if (peerAddresses.isEmpty()) {
+ LOG.debug ("Shard peers not available for replicating shard data from leader");
+ getSender().tell(new akka.actor.Status.Failure(
+ new IllegalStateException(String.format(
+ "Cannot add replica for shard %s because no peer is available",
+ shardName))), getSelf());
+ return;
+ }
+
+ Timeout findPrimaryTimeout = new Timeout(datastoreContext
+ .getShardInitializationTimeout().duration().$times(2));
+
+ final ActorRef sender = getSender();
+ Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true),
+ findPrimaryTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ LOG.debug ("Failed to receive response for FindPrimary of shard {}",
+ shardName, failure);
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+ String.format("Failed to find leader for shard %s", shardName), failure)),
+ getSelf());
+ } else {
+ if (!(response instanceof RemotePrimaryShardFound)) {
+ LOG.debug ("Shard leader not available for creating local shard replica {}",
+ shardName);
+ sender.tell(new akka.actor.Status.Failure(
+ new IllegalStateException(String.format(
+ "Invalid response type, %s, received from FindPrimary for shard %s",
+ response.getClass().getName(), shardName))), getSelf());
+ return;
+ }
+ RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
+ addShard (shardName, message, sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ private void addShard(final String shardName, final RemotePrimaryShardFound response,
+ final ActorRef sender) {
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ shardName);
+ String localShardAddress = peerAddressResolver.getShardActorAddress(shardName,
+ cluster.getCurrentMemberName());
+ final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
+ getPeerAddresses(shardName), getInitShardDataStoreContext(),
+ new DefaultShardPropsCreator(), peerAddressResolver);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+
+ //inform ShardLeader to add this shard as a replica by sending an AddServer message
+ LOG.debug ("sending AddServer message to peer {} for shard {}",
+ response.getPrimaryPath(), shardId);
+
+ Timeout addServerTimeout = new Timeout(datastoreContext
+ .getShardLeaderElectionTimeout().duration().$times(4));
+ Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+ new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object addServerResponse) {
+ if (failure != null) {
+ LOG.debug ("AddServer request to {} for {} failed",
+ response.getPrimaryPath(), shardName, failure);
+ // Remove the shard
+ localShards.remove(shardName);
+ if (shardInfo.getActor() != null) {
+ shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+ }
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+ String.format("AddServer request to leader %s for shard %s failed",
+ response.getPrimaryPath(), shardName), failure)), getSelf());
+ } else {
+ AddServerReply reply = (AddServerReply)addServerResponse;
+ onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).
+ getDispatcher(Dispatchers.DispatcherType.Client));
return;
}
+ private void onAddServerReply (String shardName, ShardInformation shardInfo,
+ AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("Leader shard successfully added the replica shard {}",
+ shardName);
+ // Make the local shard voting capable
+ shardInfo.setDatastoreContext(datastoreContext, getSelf());
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
+ shardName);
+ mBean.addLocalShard(shardId.toString());
+ sender.tell(new akka.actor.Status.Success(true), getSelf());
+ } else {
+ LOG.warn ("Cannot add shard replica {} status {}",
+ shardName, replyMsg.getStatus());
+ LOG.debug ("removing the local shard replica for shard {}",
+ shardName);
+ //remove the local replica created
+ localShards.remove(shardName);
+ if (shardInfo.getActor() != null) {
+ shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+ }
+ switch (replyMsg.getStatus()) {
+ //case ServerChangeStatus.TIMEOUT:
+ case TIMEOUT:
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+ String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+ leaderPath, shardName))), getSelf());
+ break;
+ //case ServerChangeStatus.NO_LEADER:
+ case NO_LEADER:
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+ "There is no shard leader available for shard %s", shardName))), getSelf());
+ break;
+ default :
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+ "AddServer request to leader %s for shard %s failed with status %s",
+ leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+ }
+ }
+ }
+
private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
String shardName = shardReplicaMsg.getShardName();
boolean deleteStatus = false;
private String leaderId;
private short leaderVersion;
- private final DatastoreContext datastoreContext;
+ private DatastoreContext datastoreContext;
private final ShardPropsCreator shardPropsCreator;
private final ShardPeerAddressResolver addressResolver;
void setLeaderVersion(short leaderVersion) {
this.leaderVersion = leaderVersion;
}
+
+ void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+ this.datastoreContext = datastoreContext;
+ //notify the datastoreContextchange
+ LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}",
+ this.shardName);
+ if (actor != null) {
+ actor.tell(this.datastoreContext, sender);
+ }
+ }
}
private static class ShardManagerCreator implements Creator<ShardManager> {