X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=64c6821120f94f99a389c12700757a7b8c7266f5;hb=36f3397f35d771f687173108597c5c76feba667f;hp=63266d6308287d2e816724f3f73b192b0d120bce;hpb=ceec41033ab311600969e595191a952ab4e6047d;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
index 63266d6308..e861165c6b 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
@@ -8,54 +8,542 @@
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
import akka.actor.Address;
-import akka.actor.UntypedActor;
+import akka.actor.OneForOneStrategy;
+import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
+import akka.cluster.ClusterEvent;
import akka.event.Logging;
import akka.event.LoggingAdapter;
+import akka.japi.Creator;
+import akka.japi.Function;
+import akka.japi.Procedure;
+import akka.persistence.RecoveryCompleted;
+import akka.persistence.RecoveryFailure;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
+import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
+import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
-
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.duration.Duration;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* The ShardManager has the following jobs,
- *
- * - Create all the local shard replicas that belong on this cluster member
- * - Find the primary replica for any given shard
- * - Engage in shard replica elections which decide which replica should be the primary
- *
- * Creation of Shard replicas
- * ==========================
- * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
- * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
- *
- * Replica Elections
- * =================
- * The Shard Manager uses multiple cues to initiate election.
- * - When a member of the cluster dies
- * - When a local shard replica dies
- * - When a local shard replica comes alive
+ *
+ *
Create all the local shard replicas that belong on this cluster member
+ *
Find the address of the local shard
+ *
Find the primary replica for any given shard
+ *
Monitor the cluster members and store their addresses
+ *
*/
-public class ShardManager extends UntypedActor {
+public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
- // Stores a mapping between a shard name and the address of the current primary
- private final Map shardNameToPrimaryAddress = new HashMap<>();
+ protected final LoggingAdapter LOG =
+ Logging.getLogger(getContext().system(), this);
// Stores a mapping between a member name and the address of the member
+ // Member names look like "member-1", "member-2" etc and are as specified
+ // in configuration
private final Map memberNameToAddress = new HashMap<>();
- // Stores a mapping between the shard name and all the members on which a replica of that shard are available
- private final Map> shardNameToMembers = new HashMap<>();
+ // Stores a mapping between a shard name and it's corresponding information
+ // Shard names look like inventory, topology etc and are as specified in
+ // configuration
+ private final Map localShards = new HashMap<>();
+
+ // The type of a ShardManager reflects the type of the datastore itself
+ // A data store could be of type config/operational
+ private final String type;
+
+ private final ClusterWrapper cluster;
+
+ private final Configuration configuration;
+
+ private ShardManagerInfoMBean mBean;
+
+ private final DatastoreContext datastoreContext;
+
+ private final Collection knownModules = new HashSet<>(128);
- LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+ /**
+ * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
+ * configuration or operational
+ */
+ protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
+ DatastoreContext datastoreContext) {
+
+ this.type = Preconditions.checkNotNull(type, "type should not be null");
+ this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
+ this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+ this.datastoreContext = datastoreContext;
+
+ // Subscribe this actor to cluster member events
+ cluster.subscribeToMemberEvents(getSelf());
+
+ createLocalShards();
+ }
+
+ public static Props props(final String type,
+ final ClusterWrapper cluster,
+ final Configuration configuration,
+ final DatastoreContext datastoreContext) {
+
+ Preconditions.checkNotNull(type, "type should not be null");
+ Preconditions.checkNotNull(cluster, "cluster should not be null");
+ Preconditions.checkNotNull(configuration, "configuration should not be null");
+
+ return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
+ }
@Override
- public void onReceive(Object message) throws Exception {
- if(message instanceof FindPrimary ){
- FindPrimary msg = ((FindPrimary) message);
- getSender().tell(new PrimaryNotFound(msg.getShardName()), getSelf());
+ public void handleCommand(Object message) throws Exception {
+ if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+ findPrimary(FindPrimary.fromSerializable(message));
+ } else if(message instanceof FindLocalShard){
+ findLocalShard((FindLocalShard) message);
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext(message);
+ } else if(message instanceof ActorInitialized) {
+ onActorInitialized(message);
+ } else if (message instanceof ClusterEvent.MemberUp){
+ memberUp((ClusterEvent.MemberUp) message);
+ } else if(message instanceof ClusterEvent.MemberRemoved) {
+ memberRemoved((ClusterEvent.MemberRemoved) message);
+ } else if(message instanceof ClusterEvent.UnreachableMember) {
+ ignoreMessage(message);
+ } else{
+ unknownMessage(message);
+ }
+
+ }
+
+ private void onActorInitialized(Object message) {
+ final ActorRef sender = getSender();
+
+ if (sender == null) {
+ return; //why is a non-actor sending this message? Just ignore.
+ }
+
+ String actorName = sender.path().name();
+ //find shard name from actor name; actor name is stringified shardId
+ ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
+
+ if (shardId.getShardName() == null) {
+ return;
+ }
+ markShardAsInitialized(shardId.getShardName());
+ }
+
+ private void markShardAsInitialized(String shardName) {
+ LOG.debug("Initializing shard [{}]", shardName);
+ ShardInformation shardInformation = localShards.get(shardName);
+ if (shardInformation != null) {
+ shardInformation.setActorInitialized();
+ }
+ }
+
+ @Override
+ protected void handleRecover(Object message) throws Exception {
+ if(message instanceof SchemaContextModules){
+ SchemaContextModules msg = (SchemaContextModules) message;
+ knownModules.clear();
+ knownModules.addAll(msg.getModules());
+ } else if(message instanceof RecoveryFailure){
+ RecoveryFailure failure = (RecoveryFailure) message;
+ LOG.error(failure.cause(), "Recovery failed");
+ } else if(message instanceof RecoveryCompleted){
+ LOG.info("Recovery complete : {}", persistenceId());
+
+ // Delete all the messages from the akka journal except the last one
+ deleteMessages(lastSequenceNr() - 1);
+ }
+ }
+
+ private void findLocalShard(FindLocalShard message) {
+ final ShardInformation shardInformation = localShards.get(message.getShardName());
+
+ if(shardInformation == null){
+ getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
+ return;
+ }
+
+ sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier