/*
* Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
package org.opendaylight.controller.cluster.datastore;
import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.cluster.ClusterEvent;
import akka.japi.Creator;
import akka.japi.Function;
import akka.japi.Procedure;
import akka.persistence.RecoveryCompleted;
import akka.persistence.RecoveryFailure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.opendaylight.controller.cluster.DataPersistenceProvider;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;
/**
* The ShardManager has the following jobs,
*
*
Create all the local shard replicas that belong on this cluster member
*
Find the address of the local shard
*
Find the primary replica for any given shard
*
Monitor the cluster members and store their addresses
*
*/
public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
private final Logger LOG = LoggerFactory.getLogger(getClass());
// Stores a mapping between a member name and the address of the member
// Member names look like "member-1", "member-2" etc and are as specified
// in configuration
private final Map memberNameToAddress = new HashMap<>();
// Stores a mapping between a shard name and it's corresponding information
// Shard names look like inventory, topology etc and are as specified in
// configuration
private final Map localShards = new HashMap<>();
// The type of a ShardManager reflects the type of the datastore itself
// A data store could be of type config/operational
private final String type;
private final ClusterWrapper cluster;
private final Configuration configuration;
private final String shardDispatcherPath;
private ShardManagerInfoMBean mBean;
private final DatastoreContext datastoreContext;
private Collection knownModules = Collections.emptySet();
private final DataPersistenceProvider dataPersistenceProvider;
/**
*/
protected ShardManager(ClusterWrapper cluster, Configuration configuration,
DatastoreContext datastoreContext) {
this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
this.datastoreContext = datastoreContext;
this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
this.type = datastoreContext.getDataStoreType();
this.shardDispatcherPath =
new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
// Subscribe this actor to cluster member events
cluster.subscribeToMemberEvents(getSelf());
createLocalShards();
}
protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
}
public static Props props(
final ClusterWrapper cluster,
final Configuration configuration,
final DatastoreContext datastoreContext) {
Preconditions.checkNotNull(cluster, "cluster should not be null");
Preconditions.checkNotNull(configuration, "configuration should not be null");
return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
}
@Override
public void handleCommand(Object message) throws Exception {
if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
findPrimary(FindPrimary.fromSerializable(message));
} else if(message instanceof FindLocalShard){
findLocalShard((FindLocalShard) message);
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext(message);
} else if(message instanceof ActorInitialized) {
onActorInitialized(message);
} else if (message instanceof ClusterEvent.MemberUp){
memberUp((ClusterEvent.MemberUp) message);
} else if(message instanceof ClusterEvent.MemberRemoved) {
memberRemoved((ClusterEvent.MemberRemoved) message);
} else if(message instanceof ClusterEvent.UnreachableMember) {
ignoreMessage(message);
} else{
unknownMessage(message);
}
}
private void onActorInitialized(Object message) {
final ActorRef sender = getSender();
if (sender == null) {
return; //why is a non-actor sending this message? Just ignore.
}
String actorName = sender.path().name();
//find shard name from actor name; actor name is stringified shardId
ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
if (shardId.getShardName() == null) {
return;
}
markShardAsInitialized(shardId.getShardName());
}
private void markShardAsInitialized(String shardName) {
LOG.debug("Initializing shard [{}]", shardName);
ShardInformation shardInformation = localShards.get(shardName);
if (shardInformation != null) {
shardInformation.setActorInitialized();
}
}
@Override
protected void handleRecover(Object message) throws Exception {
if(dataPersistenceProvider.isRecoveryApplicable()) {
if (message instanceof SchemaContextModules) {
SchemaContextModules msg = (SchemaContextModules) message;
knownModules = ImmutableSet.copyOf(msg.getModules());
} else if (message instanceof RecoveryFailure) {
RecoveryFailure failure = (RecoveryFailure) message;
LOG.error("Recovery failed", failure.cause());
} else if (message instanceof RecoveryCompleted) {
LOG.info("Recovery complete : {}", persistenceId());
// Delete all the messages from the akka journal except the last one
deleteMessages(lastSequenceNr() - 1);
}
} else {
if (message instanceof RecoveryCompleted) {
LOG.info("Recovery complete : {}", persistenceId());
// Delete all the messages from the akka journal
deleteMessages(lastSequenceNr());
}
}
}
private void findLocalShard(FindLocalShard message) {
final ShardInformation shardInformation = localShards.get(message.getShardName());
if(shardInformation == null){
getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
return;
}
sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier