package org.opendaylight.controller.cluster.datastore;
+import static akka.pattern.Patterns.ask;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
+import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.japi.Function;
import akka.persistence.RecoveryCompleted;
import akka.serialization.Serialization;
+import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
+import org.opendaylight.controller.cluster.raft.messages.AddServer;
+import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
+import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
+import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
private ShardManagerInfo mBean;
- private DatastoreContext datastoreContext;
+ private DatastoreContextFactory datastoreContextFactory;
private final CountDownLatch waitTillReadyCountdownLatch;
/**
*/
protected ShardManager(ClusterWrapper cluster, Configuration configuration,
- DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
+ DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
PrimaryShardInfoFutureCache primaryShardInfoCache) {
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
- this.datastoreContext = datastoreContext;
- this.type = datastoreContext.getDataStoreType();
+ this.datastoreContextFactory = datastoreContextFactory;
+ this.type = datastoreContextFactory.getBaseDatastoreContext().getDataStoreType();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
this.primaryShardInfoCache = primaryShardInfoCache;
peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
- this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver(
- peerAddressResolver).build();
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
public static Props props(
final ClusterWrapper cluster,
final Configuration configuration,
- final DatastoreContext datastoreContext,
+ final DatastoreContextFactory datastoreContextFactory,
final CountDownLatch waitTillReadyCountdownLatch,
final PrimaryShardInfoFutureCache primaryShardInfoCache) {
Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
- return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
+ return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContextFactory,
waitTillReadyCountdownLatch, primaryShardInfoCache));
}
memberUnreachable((ClusterEvent.UnreachableMember)message);
} else if(message instanceof ClusterEvent.ReachableMember) {
memberReachable((ClusterEvent.ReachableMember) message);
- } else if(message instanceof DatastoreContext) {
- onDatastoreContext((DatastoreContext)message);
+ } else if(message instanceof DatastoreContextFactory) {
+ onDatastoreContextFactory((DatastoreContextFactory)message);
} else if(message instanceof RoleChangeNotification) {
onRoleChangeNotification((RoleChangeNotification) message);
} else if(message instanceof FollowerInitialSyncUpStatus){
onSwitchShardBehavior((SwitchShardBehavior) message);
} else if(message instanceof CreateShard) {
onCreateShard((CreateShard)message);
+ } else if(message instanceof AddShardReplica){
+ onAddShardReplica((AddShardReplica)message);
+ } else if(message instanceof RemoveShardReplica){
+ onRemoveShardReplica((RemoveShardReplica)message);
} else {
unknownMessage(message);
}
DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
if(shardDatastoreContext == null) {
- shardDatastoreContext = datastoreContext;
+ shardDatastoreContext = newShardDatastoreContext(moduleShardConfig.getShardName());
} else {
shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
peerAddressResolver).build();
}
}
+ private DatastoreContext.Builder newShardDatastoreContextBuilder(String shardName) {
+ return DatastoreContext.newBuilderFrom(datastoreContextFactory.getShardDatastoreContext(shardName)).
+ shardPeerAddressResolver(peerAddressResolver);
+ }
+
+ private DatastoreContext newShardDatastoreContext(String shardName) {
+ return newShardDatastoreContextBuilder(shardName).build();
+ }
+
private void checkReady(){
if (isReadyWithLeaderId()) {
LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
LOG.debug("{}: Scheduling timer to wait for shard {}", persistenceId(), shardInformation.getShardName());
- FiniteDuration timeout = datastoreContext.getShardInitializationTimeout().duration();
+ FiniteDuration timeout = shardInformation.getDatastoreContext().getShardInitializationTimeout().duration();
if(shardInformation.isShardInitialized()) {
// If the shard is already initialized then we'll wait enough time for the shard to
// elect a leader, ie 2 times the election timeout.
- timeout = FiniteDuration.create(datastoreContext.getShardRaftConfig()
+ timeout = FiniteDuration.create(shardInformation.getDatastoreContext().getShardRaftConfig()
.getElectionTimeOutInterval().toMillis() * 2, TimeUnit.MILLISECONDS);
}
getSender().tell(messageSupplier.get(), getSelf());
}
- private NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
+ private static NoShardLeaderException createNoShardLeaderException(ShardIdentifier shardId) {
return new NoShardLeaderException(null, shardId.toString());
}
- private NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
+ private static NotInitializedException createNotInitializedException(ShardIdentifier shardId) {
return new NotInitializedException(String.format(
"Found primary shard %s but it's not initialized yet. Please try again later", shardId));
}
}
}
- private void onDatastoreContext(DatastoreContext context) {
- datastoreContext = DatastoreContext.newBuilderFrom(context).shardPeerAddressResolver(
- peerAddressResolver).build();
+ private void onDatastoreContextFactory(DatastoreContextFactory factory) {
+ datastoreContextFactory = factory;
for (ShardInformation info : localShards.values()) {
- if (info.getActor() != null) {
- info.getActor().tell(datastoreContext, getSelf());
- }
+ info.setDatastoreContext(newShardDatastoreContext(info.getShardName()), getSelf());
}
}
ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
Map<String, String> peerAddresses = getPeerAddresses(shardName);
localShardActorNames.add(shardId.toString());
- localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses, datastoreContext,
- shardPropsCreator, peerAddressResolver));
+ localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses,
+ newShardDatastoreContext(shardName), shardPropsCreator, peerAddressResolver));
}
mBean = ShardManagerInfo.createShardManagerMBean(memberName, "shard-manager-" + this.type,
- datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
+ datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(), localShardActorNames);
mBean.setShardManager(this);
}
return mBean;
}
+ private void checkLocalShardExists(final String shardName, final ActorRef sender) {
+ if (localShards.containsKey(shardName)) {
+ String msg = String.format("Local shard %s already exists", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ sender.tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ }
+ }
+
+ private void onAddShardReplica (AddShardReplica shardReplicaMsg) {
+ final String shardName = shardReplicaMsg.getShardName();
+
+ // verify the local shard replica is already available in the controller node
+ LOG.debug ("onAddShardReplica: {}", shardReplicaMsg);
+
+ checkLocalShardExists(shardName, getSender());
+
+ // verify the shard with the specified name is present in the cluster configuration
+ if (!(this.configuration.isShardConfigured(shardName))) {
+ String msg = String.format("No module configuration exists for shard %s", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ return;
+ }
+
+ // Create the localShard
+ if (schemaContext == null) {
+ String msg = String.format(
+ "No SchemaContext is available in order to create a local shard instance for %s", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ return;
+ }
+
+ Map<String, String> peerAddresses = getPeerAddresses(shardName);
+ if (peerAddresses.isEmpty()) {
+ String msg = String.format("Cannot add replica for shard %s because no peer is available", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalStateException(msg)), getSelf());
+ return;
+ }
+
+ Timeout findPrimaryTimeout = new Timeout(datastoreContextFactory.getBaseDatastoreContext().
+ getShardInitializationTimeout().duration().$times(2));
+
+ final ActorRef sender = getSender();
+ Future<Object> futureObj = ask(getSelf(), new RemoteFindPrimary(shardName, true), findPrimaryTimeout);
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if (failure != null) {
+ LOG.debug ("{}: Received failure from FindPrimary for shard {}", persistenceId(), shardName, failure);
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+ String.format("Failed to find leader for shard %s", shardName), failure)),
+ getSelf());
+ } else {
+ if (!(response instanceof RemotePrimaryShardFound)) {
+ String msg = String.format("Failed to find leader for shard %s: received response: %s",
+ shardName, response);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(msg)), getSelf());
+ return;
+ }
+
+ RemotePrimaryShardFound message = (RemotePrimaryShardFound)response;
+ addShard (shardName, message, sender);
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client));
+ }
+
+ private void addShard(final String shardName, final RemotePrimaryShardFound response, final ActorRef sender) {
+ checkLocalShardExists(shardName, sender);
+
+ ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), shardName);
+ String localShardAddress = peerAddressResolver.getShardActorAddress(shardName, cluster.getCurrentMemberName());
+
+ DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).customRaftPolicyImplementation(
+ DisableElectionsRaftPolicy.class.getName()).build();
+
+ final ShardInformation shardInfo = new ShardInformation(shardName, shardId,
+ getPeerAddresses(shardName), datastoreContext,
+ new DefaultShardPropsCreator(), peerAddressResolver);
+ localShards.put(shardName, shardInfo);
+ shardInfo.setActor(newShardActor(schemaContext, shardInfo));
+
+ //inform ShardLeader to add this shard as a replica by sending an AddServer message
+ LOG.debug ("{}: Sending AddServer message to peer {} for shard {}", persistenceId(),
+ response.getPrimaryPath(), shardId);
+
+ Timeout addServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().duration().$times(4));
+ Future<Object> futureObj = ask(getContext().actorSelection(response.getPrimaryPath()),
+ new AddServer(shardId.toString(), localShardAddress, true), addServerTimeout);
+
+ futureObj.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object addServerResponse) {
+ if (failure != null) {
+ LOG.debug ("{}: AddServer request to {} for {} failed", persistenceId(),
+ response.getPrimaryPath(), shardName, failure);
+
+ // Remove the shard
+ localShards.remove(shardName);
+ if (shardInfo.getActor() != null) {
+ shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+ }
+
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+ String.format("AddServer request to leader %s for shard %s failed",
+ response.getPrimaryPath(), shardName), failure)), getSelf());
+ } else {
+ AddServerReply reply = (AddServerReply)addServerResponse;
+ onAddServerReply(shardName, shardInfo, reply, sender, response.getPrimaryPath());
+ }
+ }
+ }, new Dispatchers(context().system().dispatchers()).
+ getDispatcher(Dispatchers.DispatcherType.Client));
+ return;
+ }
+
+ private void onAddServerReply (String shardName, ShardInformation shardInfo,
+ AddServerReply replyMsg, ActorRef sender, String leaderPath) {
+ LOG.debug ("{}: Received {} for shard {} from leader {}", persistenceId(), replyMsg, shardName, leaderPath);
+
+ if (replyMsg.getStatus() == ServerChangeStatus.OK) {
+ LOG.debug ("{}: Leader shard successfully added the replica shard {}", persistenceId(), shardName);
+
+ // Make the local shard voting capable
+ shardInfo.setDatastoreContext(newShardDatastoreContext(shardName), getSelf());
+
+ mBean.addLocalShard(shardInfo.getShardId().toString());
+ sender.tell(new akka.actor.Status.Success(true), getSelf());
+ } else {
+ LOG.warn ("{}: Leader failed to add shard replica {} with status {} - removing the local shard",
+ persistenceId(), shardName, replyMsg.getStatus());
+
+ //remove the local replica created
+ localShards.remove(shardName);
+ if (shardInfo.getActor() != null) {
+ shardInfo.getActor().tell(PoisonPill.getInstance(), getSelf());
+ }
+ switch (replyMsg.getStatus()) {
+ case TIMEOUT:
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(
+ String.format("The shard leader %s timed out trying to replicate the initial data to the new shard %s. Possible causes - there was a problem replicating the data or shard leadership changed while replicating the shard data",
+ leaderPath, shardName))), getSelf());
+ break;
+ case NO_LEADER:
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+ "There is no shard leader available for shard %s", shardName))), getSelf());
+ break;
+ default :
+ sender.tell(new akka.actor.Status.Failure(new RuntimeException(String.format(
+ "AddServer request to leader %s for shard %s failed with status %s",
+ leaderPath, shardName, replyMsg.getStatus()))), getSelf());
+ }
+ }
+ }
+
+ private void onRemoveShardReplica (RemoveShardReplica shardReplicaMsg) {
+ String shardName = shardReplicaMsg.getShardName();
+
+ // verify the local shard replica is available in the controller node
+ if (!localShards.containsKey(shardName)) {
+ String msg = String.format("Local shard %s does not", shardName);
+ LOG.debug ("{}: {}", persistenceId(), msg);
+ getSender().tell(new akka.actor.Status.Failure(new IllegalArgumentException(msg)), getSelf());
+ return;
+ }
+ // call RemoveShard for the shardName
+ getSender().tell(new akka.actor.Status.Success(true), getSelf());
+ return;
+ }
+
@VisibleForTesting
protected static class ShardInformation {
private final ShardIdentifier shardId;
private String leaderId;
private short leaderVersion;
- private final DatastoreContext datastoreContext;
+ private DatastoreContext datastoreContext;
private final ShardPropsCreator shardPropsCreator;
private final ShardPeerAddressResolver addressResolver;
return localShardDataTree;
}
+ DatastoreContext getDatastoreContext() {
+ return datastoreContext;
+ }
+
+ void setDatastoreContext(DatastoreContext datastoreContext, ActorRef sender) {
+ this.datastoreContext = datastoreContext;
+ if (actor != null) {
+ LOG.debug ("Sending new DatastoreContext to {}", shardId);
+ actor.tell(this.datastoreContext, sender);
+ }
+ }
+
void updatePeerAddress(String peerId, String peerAddress, ActorRef sender){
LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
final ClusterWrapper cluster;
final Configuration configuration;
- final DatastoreContext datastoreContext;
+ final DatastoreContextFactory datastoreContextFactory;
private final CountDownLatch waitTillReadyCountdownLatch;
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
- ShardManagerCreator(ClusterWrapper cluster, Configuration configuration, DatastoreContext datastoreContext,
- CountDownLatch waitTillReadyCountdownLatch, PrimaryShardInfoFutureCache primaryShardInfoCache) {
+ ShardManagerCreator(ClusterWrapper cluster, Configuration configuration,
+ DatastoreContextFactory datastoreContextFactory, CountDownLatch waitTillReadyCountdownLatch,
+ PrimaryShardInfoFutureCache primaryShardInfoCache) {
this.cluster = cluster;
this.configuration = configuration;
- this.datastoreContext = datastoreContext;
+ this.datastoreContextFactory = datastoreContextFactory;
this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
this.primaryShardInfoCache = primaryShardInfoCache;
}
@Override
public ShardManager create() throws Exception {
- return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch,
+ return new ShardManager(cluster, configuration, datastoreContextFactory, waitTillReadyCountdownLatch,
primaryShardInfoCache);
}
}