X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=a878f6decbf0cbe51a8c34176fc2a58e522d74c0;hb=186c5d82335ed7d8c39472355f7b1c1e084c26cd;hp=2d793157744c221454c8a542c65ed926244efb0f;hpb=e9cf78d1c39bbad20b8c8fee330b4a010ef14318;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 2d79315774..a878f6decb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -8,18 +8,22 @@ package org.opendaylight.controller.cluster.datastore; +import static akka.pattern.Patterns.ask; import akka.actor.ActorPath; import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.Cancellable; import akka.actor.OneForOneStrategy; +import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.SupervisorStrategy; import akka.cluster.ClusterEvent; +import akka.dispatch.OnComplete; import akka.japi.Creator; import akka.japi.Function; import akka.persistence.RecoveryCompleted; import akka.serialization.Serialization; +import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Optional; @@ -71,10 +75,15 @@ import org.opendaylight.controller.cluster.notifications.RoleChangeNotification; import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior; +import org.opendaylight.controller.cluster.raft.messages.AddServer; +import org.opendaylight.controller.cluster.raft.messages.AddServerReply; +import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus; +import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -746,10 +755,17 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { return mBean; } + private DatastoreContext getInitShardDataStoreContext() { + return (DatastoreContext.newBuilderFrom(datastoreContext) + .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName()) + .build()); + } + private void onAddShardReplica (AddShardReplica shardReplicaMsg) { - String shardName = shardReplicaMsg.getShardName(); + final String shardName = shardReplicaMsg.getShardName(); // verify the local shard replica is already available in the controller node + LOG.debug ("received AddShardReplica for shard {}", shardName); if (localShards.containsKey(shardName)) { LOG.debug ("Local shard {} already available in the controller node", shardName); getSender().tell(new akka.actor.Status.Failure( @@ -767,10 +783,143 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } // Create the localShard - getSender().tell(new akka.actor.Status.Success(true), getSelf()); + if (schemaContext == null) { + LOG.debug ("schemaContext is not updated to create localShardActor"); + getSender().tell(new akka.actor.Status.Failure( + new IllegalStateException(String.format( + "schemaContext not available to create localShardActor for %s", + shardName))), getSelf()); + return; + } + + Map peerAddresses = getPeerAddresses(shardName); + if (peerAddresses.isEmpty()) { + LOG.debug ("Shard peers not available for replicating shard data from leader"); + getSender().tell(new akka.actor.Status.Failure( + new IllegalStateException(String.format( + "Cannot add replica for shard %s because no peer is available", + shardName))), getSelf()); + return; + } + + Timeout findPrimaryTimeout = new Timeout(datastoreContext + .getShardInitializationTimeout().duration().$times(2)); + + final ActorRef sender = getSender(); + Future futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), + findPrimaryTimeout); + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if (failure != null) { + LOG.debug ("Failed to receive response for FindPrimary of shard {}", + shardName, failure); + sender.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("Failed to find leader for shard %s", shardName), failure)), + getSelf()); + } else { + if (!(response instanceof RemotePrimaryShardFound)) { + LOG.debug ("Shard leader not available for creating local shard replica {}", + shardName); + sender.tell(new akka.actor.Status.Failure( + new IllegalStateException(String.format( + "Invalid response type, %s, received from FindPrimary for shard %s", + response.getClass().getName(), shardName))), getSelf()); + return; + } + RemotePrimaryShardFound message = (RemotePrimaryShardFound)response; + addShard (shardName, message, sender); + } + } + }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client)); + } + + private void addShard(final String shardName, final RemotePrimaryShardFound response, + final ActorRef sender) { + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + shardName); + String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, + cluster.getCurrentMemberName()); + final ShardInformation shardInfo = new ShardInformation(shardName, shardId, + getPeerAddresses(shardName), getInitShardDataStoreContext(), + new DefaultShardPropsCreator(), peerAddressResolver); + localShards.put(shardName, shardInfo); + shardInfo.setActor(newShardActor(schemaContext, shardInfo)); + + //inform ShardLeader to add this shard as a replica by sending an AddServer message + LOG.debug ("sending AddServer message to peer {} for shard {}", + response.getPrimaryPath(), shardId); + + Timeout addServerTimeout = new Timeout(datastoreContext + .getShardLeaderElectionTimeout().duration().$times(4)); + Future futureObj = ask(getContext().actorSelection(response.getPrimaryPath()), + new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout); + + futureObj.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object addServerResponse) { + if (failure != null) { + LOG.debug ("AddServer request to {} for {} failed", + response.getPrimaryPath(), shardName, failure); + // Remove the shard + localShards.remove(shardName); + if (shardInfo.getActor() != null) { + shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); + } + sender.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("AddServer request to leader %s for shard %s failed", + response.getPrimaryPath(), shardName), failure)), getSelf()); + } else { + AddServerReply reply = (AddServerReply)addServerResponse; + onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath()); + } + } + }, new Dispatchers(context().system().dispatchers()). + getDispatcher(Dispatchers.DispatcherType.Client)); return; } + private void onAddServerReply (String shardName, ShardInformation shardInfo, + AddServerReply replyMsg, ActorRef sender, String leaderPath) { + if (replyMsg.getStatus() == ServerChangeStatus.OK) { + LOG.debug ("Leader shard successfully added the replica shard {}", + shardName); + // Make the local shard voting capable + shardInfo.setDatastoreContext(datastoreContext, getSelf()); + ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), + shardName); + mBean.addLocalShard(shardId.toString()); + sender.tell(new akka.actor.Status.Success(true), getSelf()); + } else { + LOG.warn ("Cannot add shard replica {} status {}", + shardName, replyMsg.getStatus()); + LOG.debug ("removing the local shard replica for shard {}", + shardName); + //remove the local replica created + localShards.remove(shardName); + if (shardInfo.getActor() != null) { + shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf()); + } + switch (replyMsg.getStatus()) { + //case ServerChangeStatus.TIMEOUT: + case TIMEOUT: + sender.tell(new akka.actor.Status.Failure(new RuntimeException( + String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data", + leaderPath, shardName))), getSelf()); + break; + //case ServerChangeStatus.NO_LEADER: + case NO_LEADER: + sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( + "There is no shard leader available for shard %s", shardName))), getSelf()); + break; + default : + sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format( + "AddServer request to leader %s for shard %s failed with status %s", + leaderPath, shardName, replyMsg.getStatus()))), getSelf()); + } + } + } + private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) { String shardName = shardReplicaMsg.getShardName(); boolean deleteStatus = false; @@ -808,7 +957,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { private String leaderId; private short leaderVersion; - private final DatastoreContext datastoreContext; + private DatastoreContext datastoreContext; private final ShardPropsCreator shardPropsCreator; private final ShardPeerAddressResolver addressResolver; @@ -993,6 +1142,16 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { void setLeaderVersion(short leaderVersion) { this.leaderVersion = leaderVersion; } + + void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) { + this.datastoreContext = datastoreContext; + //notify the datastoreContextchange + LOG.debug ("Notifying RaftPolicy change via datastoreContextChange for {}", + this.shardName); + if (actor != null) { + actor.tell(this.datastoreContext, sender); + } + } } private static class ShardManagerCreator implements Creator {