/*
* Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.cluster.datastore;
import static akka.pattern.Patterns.ask;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.japi.Function;
import akka.pattern.Patterns;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
import akka.persistence.SnapshotSelectionCriteria;
import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.exceptions.AlreadyExistsException;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.NotInitializedException;
import org.opendaylight.controller.cluster.datastore.exceptions.PrimaryNotFoundException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalPrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
import org.opendaylight.controller.cluster.datastore.messages.RemoteFindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.RemotePrimaryShardFound;
import org.opendaylight.controller.cluster.datastore.messages.RemoveShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.SwitchShardBehavior;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.base.messages.SwitchBehavior;
import org.opendaylight.controller.cluster.raft.client.messages.GetSnapshot;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
import org.opendaylight.controller.cluster.raft.messages.AddServer;
import org.opendaylight.controller.cluster.raft.messages.AddServerReply;
import org.opendaylight.controller.cluster.raft.messages.RemoveServer;
import org.opendaylight.controller.cluster.raft.messages.RemoveServerReply;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
* The ShardManager has the following jobs,
*
* - Create all the local shard replicas that belong on this cluster member
*
- Find the address of the local shard
*
- Find the primary replica for any given shard
*
- Monitor the cluster members and store their addresses
*
*/
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private static final Logger LOG = LoggerFactory.getLogger(ShardManager.class);
// Stores a mapping between a shard name and it's corresponding information
// Shard names look like inventory, topology etc and are as specified in
// configuration
private final Map localShards = new HashMap<>();
// The type of a ShardManager reflects the type of the datastore itself
// A data store could be of type config/operational
private final String type;
private final ClusterWrapper cluster;
private final Configuration configuration;
private final String shardDispatcherPath;
private final ShardManagerInfo mBean;
private DatastoreContextFactory datastoreContextFactory;
private final CountDownLatch waitTillReadyCountdownLatch;
private final PrimaryShardInfoFutureCache primaryShardInfoCache;
private final ShardPeerAddressResolver peerAddressResolver;
private SchemaContext schemaContext;
private DatastoreSnapshot restoreFromSnapshot;
private ShardManagerSnapshot currentSnapshot;
private final Set shardReplicaOperationsInProgress = new HashSet<>();
private final String persistenceId;
/**
*/
protected ShardManager(AbstractBuilder> builder) {
this.cluster = builder.cluster;
this.configuration = builder.configuration;
this.datastoreContextFactory = builder.datastoreContextFactory;
this.type = builder.datastoreContextFactory.getBaseDatastoreContext().getDataStoreName();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
this.waitTillReadyCountdownLatch = builder.waitTillReadyCountdownLatch;
this.primaryShardInfoCache = builder.primaryShardInfoCache;
this.restoreFromSnapshot = builder.restoreFromSnapshot;
String possiblePersistenceId = datastoreContextFactory.getBaseDatastoreContext().getShardManagerPersistenceId();
persistenceId = possiblePersistenceId != null ? possiblePersistenceId : "shard-manager-" + type;
peerAddressResolver = new ShardPeerAddressResolver(type, cluster.getCurrentMemberName());
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
List localShardActorNames = new ArrayList<>();
mBean = ShardManagerInfo.createShardManagerMBean(cluster.getCurrentMemberName(),
"shard-manager-" + this.type,
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType(),
localShardActorNames);
mBean.setShardManager(this);
}
@Override
public void postStop() {
LOG.info("Stopping ShardManager {}", persistenceId());
mBean.unregisterMBean();
}
@Override
public void handleCommand(Object message) throws Exception {
if (message instanceof FindPrimary) {
findPrimary((FindPrimary)message);
} else if(message instanceof FindLocalShard){
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext(message);
} else if(message instanceof ActorInitialized) {
onActorInitialized(message);
} else if (message instanceof ClusterEvent.MemberUp){
memberUp((ClusterEvent.MemberUp) message);
} else if (message instanceof ClusterEvent.MemberExited){
memberExited((ClusterEvent.MemberExited) message);
} else if(message instanceof ClusterEvent.MemberRemoved) {
memberRemoved((ClusterEvent.MemberRemoved) message);
} else if(message instanceof ClusterEvent.UnreachableMember) {
memberUnreachable((ClusterEvent.UnreachableMember)message);
} else if(message instanceof ClusterEvent.ReachableMember) {
memberReachable((ClusterEvent.ReachableMember) message);
} else if(message instanceof DatastoreContextFactory) {
onDatastoreContextFactory((DatastoreContextFactory)message);
} else if(message instanceof RoleChangeNotification) {
onRoleChangeNotification((RoleChangeNotification) message);
} else if(message instanceof FollowerInitialSyncUpStatus){
onFollowerInitialSyncStatus((FollowerInitialSyncUpStatus) message);
} else if(message instanceof ShardNotInitializedTimeout) {
onShardNotInitializedTimeout((ShardNotInitializedTimeout)message);
} else if(message instanceof ShardLeaderStateChanged) {
onLeaderStateChanged((ShardLeaderStateChanged) message);
} else if(message instanceof SwitchShardBehavior){
onSwitchShardBehavior((SwitchShardBehavior) message);
} else if(message instanceof CreateShard) {
onCreateShard((CreateShard)message);
} else if(message instanceof AddShardReplica){
onAddShardReplica((AddShardReplica)message);
} else if(message instanceof ForwardedAddServerReply) {
ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
msg.removeShardOnFailure);
} else if(message instanceof ForwardedAddServerFailure) {
ForwardedAddServerFailure msg = (ForwardedAddServerFailure)message;
onAddServerFailure(msg.shardName, msg.failureMessage, msg.failure, getSender(), msg.removeShardOnFailure);
} else if(message instanceof PrimaryShardFoundForContext) {
PrimaryShardFoundForContext primaryShardFoundContext = (PrimaryShardFoundForContext)message;
onPrimaryShardFoundContext(primaryShardFoundContext);
} else if(message instanceof RemoveShardReplica) {
onRemoveShardReplica((RemoveShardReplica) message);
} else if(message instanceof WrappedShardResponse){
onWrappedShardResponse((WrappedShardResponse) message);
} else if(message instanceof GetSnapshot) {
onGetSnapshot();
} else if(message instanceof ServerRemoved){
onShardReplicaRemoved((ServerRemoved) message);
} else if(message instanceof SaveSnapshotSuccess) {
onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
} else if(message instanceof SaveSnapshotFailure) {
LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
persistenceId(), ((SaveSnapshotFailure) message).cause());
} else if(message instanceof Shutdown) {
onShutDown();
} else {
unknownMessage(message);
}
}
private void onShutDown() {
Shutdown shutdown = new Shutdown();
List> stopFutures = new ArrayList<>(localShards.size());
for (ShardInformation info : localShards.values()) {
if (info.getActor() != null) {
LOG.debug("{}: Issuing gracefulStop to shard {}", persistenceId(), info.getShardId());
FiniteDuration duration = info.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
stopFutures.add(Patterns.gracefulStop(info.getActor(), duration, shutdown));
}
}
LOG.info("Shutting down ShardManager {} - waiting on {} shards", persistenceId(), stopFutures.size());
ExecutionContext dispatcher = new Dispatchers(context().system().dispatchers()).getDispatcher(Dispatchers.DispatcherType.Client);
Future> combinedFutures = Futures.sequence(stopFutures, dispatcher);
combinedFutures.onComplete(new OnComplete>() {
@Override
public void onComplete(Throwable failure, Iterable results) {
LOG.debug("{}: All shards shutdown - sending PoisonPill to self", persistenceId());
self().tell(PoisonPill.getInstance(), self());
if(failure != null) {
LOG.warn("{}: An error occurred attempting to shut down the shards", persistenceId(), failure);
} else {
int nfailed = 0;
for(Boolean r: results) {
if(!r) {
nfailed++;
}
}
if(nfailed > 0) {
LOG.warn("{}: {} shards did not shut down gracefully", persistenceId(), nfailed);
}
}
}
}, dispatcher);
}
private void onWrappedShardResponse(WrappedShardResponse message) {
if (message.getResponse() instanceof RemoveServerReply) {
onRemoveServerReply(getSender(), message.getShardId(), (RemoveServerReply) message.getResponse(),
message.getLeaderPath());
}
}
private void onRemoveServerReply(ActorRef originalSender, ShardIdentifier shardId, RemoveServerReply replyMsg,
String leaderPath) {
shardReplicaOperationsInProgress.remove(shardId);
LOG.debug ("{}: Received {} for shard {}", persistenceId(), replyMsg, shardId.getShardName());
if (replyMsg.getStatus() == ServerChangeStatus.OK) {
LOG.debug ("{}: Leader shard successfully removed the replica shard {}", persistenceId(),
shardId.getShardName());
originalSender.tell(new akka.actor.Status.Success(null), getSelf());
} else {
LOG.warn ("{}: Leader failed to remove shard replica {} with status {}",
persistenceId(), shardId, replyMsg.getStatus());
Exception failure = getServerChangeException(RemoveServer.class, replyMsg.getStatus(),
leaderPath, shardId);
originalSender.tell(new akka.actor.Status.Failure(failure), getSelf());
}
}
private void onPrimaryShardFoundContext(PrimaryShardFoundForContext primaryShardFoundContext) {
if(primaryShardFoundContext.getContextMessage() instanceof AddShardReplica) {
addShard(primaryShardFoundContext.getShardName(), primaryShardFoundContext.getRemotePrimaryShardFound(),
getSender());
} else if(primaryShardFoundContext.getContextMessage() instanceof RemoveShardReplica){
removeShardReplica((RemoveShardReplica) primaryShardFoundContext.getContextMessage(),
primaryShardFoundContext.getShardName(), primaryShardFoundContext.getPrimaryPath(), getSender());
}
}
private void removeShardReplica(RemoveShardReplica contextMessage, final String shardName, final String primaryPath,
final ActorRef sender) {
if(isShardReplicaOperationInProgress(shardName, sender)) {
return;
}
shardReplicaOperationsInProgress.add(shardName);
final ShardIdentifier shardId = getShardIdentifier(contextMessage.getMemberName(), shardName);
final DatastoreContext datastoreContext = newShardDatastoreContextBuilder(shardName).build();
//inform ShardLeader to remove this shard as a replica by sending an RemoveServer message
LOG.debug ("{}: Sending RemoveServer message to peer {} for shard {}", persistenceId(),
primaryPath, shardId);
Timeout removeServerTimeout = new Timeout(datastoreContext.getShardLeaderElectionTimeout().
duration());
Future