X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=81236521d649a48902de103c52888488559112ca;hb=1e59825dbec7b354d76bd7efa6a61e4ad802c802;hp=63266d6308287d2e816724f3f73b192b0d120bce;hpb=81bbe76bd26399118d028663d08e464ce6b7d040;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 63266d6308..81236521d6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -8,12 +8,17 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorPath; +import akka.actor.ActorRef; import akka.actor.Address; -import akka.actor.UntypedActor; +import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; +import akka.japi.Creator; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; +import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import java.util.HashMap; import java.util.List; @@ -21,41 +26,105 @@ import java.util.Map; /** * The ShardManager has the following jobs, - * - * - Create all the local shard replicas that belong on this cluster member - * - Find the primary replica for any given shard - * - Engage in shard replica elections which decide which replica should be the primary - * - * Creation of Shard replicas - * ========================== - * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas - * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service. - * - * Replica Elections - * ================= - * The Shard Manager uses multiple cues to initiate election. - * - When a member of the cluster dies - * - When a local shard replica dies - * - When a local shard replica comes alive + *

+ *

  • Create all the local shard replicas that belong on this cluster member + *
  • Find the primary replica for any given shard + *
  • Engage in shard replica elections which decide which replica should be the primary + *

    + *

    + *

    >Creation of Shard replicas

    + * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas + * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service. + *

    + *

    + *

    Replica Elections

    + *

    + *

    + * The Shard Manager uses multiple cues to initiate election. + *

  • When a member of the cluster dies + *
  • When a local shard replica dies + *
  • When a local shard replica comes alive + *

    */ -public class ShardManager extends UntypedActor { +public class ShardManager extends AbstractUntypedActor { // Stores a mapping between a shard name and the address of the current primary - private final Map shardNameToPrimaryAddress = new HashMap<>(); + private final Map shardNameToPrimaryAddress = + new HashMap<>(); // Stores a mapping between a member name and the address of the member private final Map memberNameToAddress = new HashMap<>(); // Stores a mapping between the shard name and all the members on which a replica of that shard are available - private final Map> shardNameToMembers = new HashMap<>(); + private final Map> shardNameToMembers = + new HashMap<>(); + + private final LoggingAdapter log = + Logging.getLogger(getContext().system(), this); + + + private final Map localShards = new HashMap<>(); + + + private final ClusterWrapper cluster; + + /** + * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be + * configuration or operational + */ + private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) { + this.cluster = cluster; + String memberName = cluster.getCurrentMemberName(); + List memberShardNames = + configuration.getMemberShardNames(memberName); - LoggingAdapter log = Logging.getLogger(getContext().system(), this); + for(String shardName : memberShardNames){ + ActorRef actor = getContext() + .actorOf(Shard.props(memberName + "-shard-" + shardName + "-" + type), + memberName + "-shard-" + shardName + "-" + type); + ActorPath path = actor.path(); + localShards.put(shardName, path); + } + } + + public static Props props(final String type, + final ClusterWrapper cluster, + final Configuration configuration) { + return Props.create(new Creator() { + + @Override + public ShardManager create() throws Exception { + return new ShardManager(type, cluster, configuration); + } + }); + } @Override - public void onReceive(Object message) throws Exception { - if(message instanceof FindPrimary ){ + public void handleReceive(Object message) throws Exception { + if (message instanceof FindPrimary) { FindPrimary msg = ((FindPrimary) message); - getSender().tell(new PrimaryNotFound(msg.getShardName()), getSelf()); + String shardName = msg.getShardName(); + + + if (Shard.DEFAULT_NAME.equals(shardName)) { + ActorPath defaultShardPath = localShards.get(shardName); + if(defaultShardPath == null){ + throw new IllegalStateException("local default shard not found"); + } + getSender().tell(new PrimaryFound(defaultShardPath.toString()), + getSelf()); + } else { + getSender().tell(new PrimaryNotFound(shardName), getSelf()); + } + } else if (message instanceof UpdateSchemaContext) { + for(ActorPath path : localShards.values()){ + getContext().system().actorSelection(path) + .forward(message, + getContext()); + } } } + + }