/*
* Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
import akka.japi.Creator;
import akka.japi.Function;
import akka.persistence.RecoveryCompleted;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.CreateShardReply;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
* The ShardManager has the following jobs,
*
* - Create all the local shard replicas that belong on this cluster member
*
- Find the address of the local shard
*
- Find the primary replica for any given shard
*
- Monitor the cluster members and store their addresses
*
*/
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
// Stores a mapping between a shard name and it's corresponding information
// Shard names look like inventory, topology etc and are as specified in
// configuration
private final Map localShards = new HashMap<>();
// The type of a ShardManager reflects the type of the datastore itself
// A data store could be of type config/operational
private final String type;
private final ClusterWrapper cluster;
private final Configuration configuration;
private final String shardDispatcherPath;
private ShardManagerInfo mBean;
private DatastoreContext datastoreContext;
private final CountDownLatch waitTillReadyCountdownLatch;
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
private final ShardPeerAddressResolver peerAddressResolver;
private SchemaContext schemaContext;
/**
*/
protected ShardManager(ClusterWrapper cluster, Configuration configuration,
DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch,
PrimaryShardInfoFutureCache primaryShardInfoCache) {
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
this.datastoreContext = datastoreContext;
this.type = datastoreContext.getDataStoreType();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
this.primaryShardInfoCache = primaryShardInfoCache;
peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
this.datastoreContext = DatastoreContext.newBuilderFrom(datastoreContext).shardPeerAddressResolver(
peerAddressResolver).build();
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
createLocalShards();
}
public static Props props(
final ClusterWrapper cluster,
final Configuration configuration,
final DatastoreContext datastoreContext,
final CountDownLatch waitTillReadyCountdownLatch,
final PrimaryShardInfoFutureCache primaryShardInfoCache) {
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
Preconditions.checkNotNull(primaryShardInfoCache, "primaryShardInfoCache should not be null");
return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext,
waitTillReadyCountdownLatch, primaryShardInfoCache));
}
@Override
public void postStop() {
LOG.info("Stopping ShardManager");
mBean.unregisterMBean();
}
@Override
public void handleCommand(Object message) throws Exception {
if (message instanceof FindPrimary) {
findPrimary((FindPrimary)message);
} else if(message instanceof FindLocalShard){
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext(message);
} else if(message instanceof ActorInitialized) {
onActorInitialized(message);
} else if (message instanceof ClusterEvent.MemberUp){
memberUp((ClusterEvent.MemberUp) message);
} else if (message instanceof ClusterEvent.MemberExited){
memberExited((ClusterEvent.MemberExited) message);
} else if(message instanceof ClusterEvent.MemberRemoved) {
memberRemoved((ClusterEvent.MemberRemoved) message);
} else if(message instanceof ClusterEvent.UnreachableMember) {
memberUnreachable((ClusterEvent.UnreachableMember)message);
} else if(message instanceof ClusterEvent.ReachableMember) {
memberReachable((ClusterEvent.ReachableMember) message);
} else if(message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
} else if(message instanceof RoleChangeNotification) {
onRoleChangeNotification((RoleChangeNotification) message);
} else if(message instanceof FollowerInitialSyncUpStatus){
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
} else if(message instanceof ShardNotInitializedTimeout) {
onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
} else if(message instanceof ShardLeaderStateChanged) {
onLeaderStateChanged((ShardLeaderStateChanged) message);
} else if(message instanceof SwitchShardBehavior){
onSwitchShardBehavior((SwitchShardBehavior) message);
} else if(message instanceof CreateShard) {
onCreateShard((CreateShard)message);
} else if(message instanceof AddShardReplica){
onAddShardReplica((AddShardReplica)message);
} else if(message instanceof RemoveShardReplica){
onRemoveShardReplica((RemoveShardReplica)message);
} else {
unknownMessage(message);
}
}
private void onCreateShard(CreateShard createShard) {
Object reply;
try {
ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
if(localShards.containsKey(moduleShardConfig.getShardName())) {
throw new IllegalStateException(String.format("Shard with name %s already exists",
moduleShardConfig.getShardName()));
}
configuration.addModuleShardConfiguration(moduleShardConfig);
ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(), moduleShardConfig.getShardName());
Map peerAddresses = getPeerAddresses(moduleShardConfig.getShardName()/*,
moduleShardConfig.getShardMemberNames()*/);
LOG.debug("onCreateShard: shardId: {}, memberNames: {}. peerAddresses: {}", shardId,
moduleShardConfig.getShardMemberNames(), peerAddresses);
DatastoreContext shardDatastoreContext = createShard.getDatastoreContext();
if(shardDatastoreContext == null) {
shardDatastoreContext = datastoreContext;
} else {
shardDatastoreContext = DatastoreContext.newBuilderFrom(shardDatastoreContext).shardPeerAddressResolver(
peerAddressResolver).build();
}
ShardInformation info = new ShardInformation(moduleShardConfig.getShardName(), shardId, peerAddresses,
shardDatastoreContext, createShard.getShardPropsCreator(), peerAddressResolver);
localShards.put(info.getShardName(), info);
mBean.addLocalShard(shardId.toString());
if(schemaContext != null) {
info.setActor(newShardActor(schemaContext, info));
}
reply = new CreateShardReply();
} catch (Exception e) {
LOG.error("onCreateShard failed", e);
reply = new akka.actor.Status.Failure(e);
}
if(getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
getSender().tell(reply, getSelf());
}
}
private void checkReady(){
if (isReadyWithLeaderId()) {
LOG.info("{}: All Shards are ready - data store {} is ready, available count is {}",
persistenceId(), type, waitTillReadyCountdownLatch.getCount());
waitTillReadyCountdownLatch.countDown();
}
}
private void onLeaderStateChanged(ShardLeaderStateChanged leaderStateChanged) {
LOG.info("{}: Received LeaderStateChanged message: {}", persistenceId(), leaderStateChanged);
ShardInformation shardInformation = findShardInformation(leaderStateChanged.getMemberId());
if(shardInformation != null) {
shardInformation.setLocalDataTree(leaderStateChanged.getLocalShardDataTree());
shardInformation.setLeaderVersion(leaderStateChanged.getLeaderPayloadVersion());
if(shardInformation.setLeaderId(leaderStateChanged.getLeaderId())) {
primaryShardInfoCache.remove(shardInformation.getShardName());
}
checkReady();
} else {
LOG.debug("No shard found with member Id {}", leaderStateChanged.getMemberId());
}
}
private void onShardNotInitializedTimeout(ShardNotInitializedTimeout message) {
ShardInformation shardInfo = message.getShardInfo();
LOG.debug("{}: Received ShardNotInitializedTimeout message for shard {}", persistenceId(),
shardInfo.getShardName());
shardInfo.removeOnShardInitialized(message.getOnShardInitialized());
if(!shardInfo.isShardInitialized()) {
LOG.debug("{}: Returning NotInitializedException for shard {}", persistenceId(), shardInfo.getShardName());
message.getSender().tell(createNotInitializedException(shardInfo.shardId), getSelf());
} else {
LOG.debug("{}: Returning NoShardLeaderException for shard {}", persistenceId(), shardInfo.getShardName());
message.getSender().tell(createNoShardLeaderException(shardInfo.shardId), getSelf());
}
}
private void onFollowerInitialSyncStatus(FollowerInitialSyncUpStatus status) {
LOG.info("{} Received follower initial sync status for {} status sync done {}", persistenceId(),
status.getName(), status.isInitialSyncDone());
ShardInformation shardInformation = findShardInformation(status.getName());
if(shardInformation != null) {
shardInformation.setFollowerSyncStatus(status.isInitialSyncDone());
mBean.setSyncStatus(isInSync());
}
}
private void onRoleChangeNotification(RoleChangeNotification roleChanged) {
LOG.info("{}: Received role changed for {} from {} to {}", persistenceId(), roleChanged.getMemberId(),
roleChanged.getOldRole(), roleChanged.getNewRole());
ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
if(shardInformation != null) {
shardInformation.setRole(roleChanged.getNewRole());
checkReady();
mBean.setSyncStatus(isInSync());
}
}
private ShardInformation findShardInformation(String memberId) {
for(ShardInformation info : localShards.values()){
if(info.getShardId().toString().equals(memberId)){
return info;
}
}
return null;
}
private boolean isReadyWithLeaderId() {
boolean isReady = true;
for (ShardInformation info : localShards.values()) {
if(!info.isShardReadyWithLeaderId()){
isReady = false;
break;
}
}
return isReady;
}
private boolean isInSync(){
for (ShardInformation info : localShards.values()) {
if(!info.isInSync()){
return false;
}
}
return true;
}
private void onActorInitialized(Object message) {
final ActorRef sender = getSender();
if (sender == null) {
return; //why is a non-actor sending this message? Just ignore.
}
String actorName = sender.path().name();
//find shard name from actor name; actor name is stringified shardId
ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
if (shardId.getShardName() == null) {
return;
}
markShardAsInitialized(shardId.getShardName());
}
private void markShardAsInitialized(String shardName) {
LOG.debug("{}: Initializing shard [{}]", persistenceId(), shardName);
ShardInformation shardInformation = localShards.get(shardName);
if (shardInformation != null) {
shardInformation.setActorInitialized();
shardInformation.getActor().tell(new RegisterRoleChangeListener(), self());
}
}
@Override
protected void handleRecover(Object message) throws Exception {
if (message instanceof RecoveryCompleted) {
LOG.info("Recovery complete : {}", persistenceId());
// We no longer persist SchemaContext modules so delete all the prior messages from the akka
// journal on upgrade from Helium.
deleteMessages(lastSequenceNr());
}
}
private void findLocalShard(FindLocalShard message) {
final ShardInformation shardInformation = localShards.get(message.getShardName());
if(shardInformation == null){
getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
return;
}
sendResponse(shardInformation, message.isWaitUntilInitialized(), false, new Supplier