X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=5fbce4cd98900be65053939ca7a51d4f7a929a98;hp=f5e46852e3cab8be4c9799e9d00a472e051c9f9c;hb=475d28f717bae92b2cc10b0589131771fcc62242;hpb=0bdf3307bc17f562c84cceeacd29f42409bfff34 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index f5e46852e3..5fbce4cd98 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -1,11 +1,29 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorPath; +import akka.actor.ActorRef; import akka.actor.Address; -import akka.actor.UntypedActor; -import akka.event.Logging; -import akka.event.LoggingAdapter; +import akka.actor.OneForOneStrategy; +import akka.actor.Props; +import akka.actor.SupervisorStrategy; +import akka.cluster.ClusterEvent; +import akka.japi.Creator; +import akka.japi.Function; +import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; +import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import scala.concurrent.duration.Duration; import java.util.HashMap; import java.util.List; @@ -13,41 +31,265 @@ import java.util.Map; /** * The ShardManager has the following jobs, - * - * - Create all the local shard replicas that belong on this cluster member - * - Find the primary replica for any given shard - * - Engage in shard replica elections which decide which replica should be the primary - * - * Creation of Shard replicas - * ========================== - * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas - * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service. - * - * Replica Elections - * ================= - * The Shard Manager uses multiple cues to initiate election. - * - When a member of the cluster dies - * - When a local shard replica dies - * - When a local shard replica comes alive + *

+ *

  • Create all the local shard replicas that belong on this cluster member + *
  • Find the primary replica for any given shard + *
  • Engage in shard replica elections which decide which replica should be the primary + *

    + *

    + *

    >Creation of Shard replicas

    + * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas + * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service. + *

    + *

    + *

    Replica Elections

    + *

    + *

    + * The Shard Manager uses multiple cues to initiate election. + *

  • When a member of the cluster dies + *
  • When a local shard replica dies + *
  • When a local shard replica comes alive + *

    */ -public class ShardManager extends UntypedActor { - - // Stores a mapping between a shard name and the address of the current primary - private final Map shardNameToPrimaryAddress = new HashMap<>(); +public class ShardManager extends AbstractUntypedActor { // Stores a mapping between a member name and the address of the member private final Map memberNameToAddress = new HashMap<>(); - // Stores a mapping between the shard name and all the members on which a replica of that shard are available - private final Map> shardNameToMembers = new HashMap<>(); + private final Map localShards = new HashMap<>(); + + + private final String type; + + private final ClusterWrapper cluster; + + private final Configuration configuration; + + /** + * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be + * configuration or operational + */ + private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) { + + this.type = Preconditions.checkNotNull(type, "type should not be null"); + this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null"); + this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null"); + + // Subscribe this actor to cluster member events + cluster.subscribeToMemberEvents(getSelf()); + + // Create all the local Shards and make them a child of the ShardManager + // TODO: This may need to be initiated when we first get the schema context + createLocalShards(); + } + + public static Props props(final String type, + final ClusterWrapper cluster, + final Configuration configuration) { + return Props.create(new Creator() { + + @Override + public ShardManager create() throws Exception { + return new ShardManager(type, cluster, configuration); + } + }); + } - LoggingAdapter log = Logging.getLogger(getContext().system(), this); @Override - public void onReceive(Object message) throws Exception { - if(message instanceof FindPrimary ){ - FindPrimary msg = ((FindPrimary) message); - getSender().tell(new PrimaryNotFound(msg.getShardName()), getSelf()); + public void handleReceive(Object message) throws Exception { + if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) { + findPrimary( + FindPrimary.fromSerializable(message)); + + } else if (message instanceof UpdateSchemaContext) { + updateSchemaContext(message); + } else if (message instanceof ClusterEvent.MemberUp){ + memberUp((ClusterEvent.MemberUp) message); + } else if(message instanceof ClusterEvent.MemberRemoved) { + memberRemoved((ClusterEvent.MemberRemoved) message); + } else if(message instanceof ClusterEvent.UnreachableMember) { + ignoreMessage(message); + } else{ + throw new Exception ("Not recognized message received, message="+message); + } + + } + + private void ignoreMessage(Object message){ + LOG.debug("Unhandled message : " + message); + } + + private void memberRemoved(ClusterEvent.MemberRemoved message) { + memberNameToAddress.remove(message.member().roles().head()); + } + + private void memberUp(ClusterEvent.MemberUp message) { + String memberName = message.member().roles().head(); + + memberNameToAddress.put(memberName , message.member().address()); + + for(ShardInformation info : localShards.values()){ + String shardName = info.getShardName(); + info.updatePeerAddress(getShardActorName(memberName, shardName), + getShardActorPath(shardName, memberName)); + } + } + + private void updateSchemaContext(Object message) { + for(ShardInformation info : localShards.values()){ + info.getActor().tell(message,getSelf()); + } + } + + private void findPrimary(FindPrimary message) { + String shardName = message.getShardName(); + + List members = + configuration.getMembersFromShardName(shardName); + + // First see if the there is a local replica for the shard + ShardInformation info = localShards.get(shardName); + if(info != null) { + ActorPath shardPath = info.getActorPath(); + if (shardPath != null) { + getSender() + .tell( + new PrimaryFound(shardPath.toString()).toSerializable(), + getSelf()); + return; + } + } + + if(cluster.getCurrentMemberName() != null) { + members.remove(cluster.getCurrentMemberName()); + } + + // There is no way for us to figure out the primary (for now) so assume + // that one of the remote nodes is a primary + for(String memberName : members) { + Address address = memberNameToAddress.get(memberName); + if(address != null){ + String path = + getShardActorPath(shardName, memberName); + getSender().tell(new PrimaryFound(path).toSerializable(), getSelf()); + return; + } + } + getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf()); + } + + private String + + + getShardActorPath(String shardName, String memberName) { + Address address = memberNameToAddress.get(memberName); + if(address != null) { + return address.toString() + "/user/shardmanager-" + this.type + "/" + + getShardActorName( + memberName, shardName); + } + return null; + } + + private String getShardActorName(String memberName, String shardName){ + return memberName + "-shard-" + shardName + "-" + this.type; + } + + // Create the shards that are local to this member + private void createLocalShards() { + String memberName = this.cluster.getCurrentMemberName(); + List memberShardNames = + this.configuration.getMemberShardNames(memberName); + + for(String shardName : memberShardNames){ + String shardActorName = getShardActorName(memberName, shardName); + Map peerAddresses = getPeerAddresses(shardName); + ActorRef actor = getContext() + .actorOf(Shard.props(shardActorName, peerAddresses), + shardActorName); + localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses)); + } + + } + + private Map getPeerAddresses(String shardName){ + + Map peerAddresses = new HashMap<>(); + + List members = + this.configuration.getMembersFromShardName(shardName); + + String currentMemberName = this.cluster.getCurrentMemberName(); + + for(String memberName : members){ + if(!currentMemberName.equals(memberName)){ + String shardActorName = getShardActorName(memberName, shardName); + String path = + getShardActorPath(shardName, currentMemberName); + peerAddresses.put(shardActorName, path); + } + } + return peerAddresses; + } + + + @Override + public SupervisorStrategy supervisorStrategy() { + return new OneForOneStrategy(10, Duration.create("1 minute"), + new Function() { + @Override + public SupervisorStrategy.Directive apply(Throwable t) { + return SupervisorStrategy.resume(); + } + } + ); + + } + + private class ShardInformation { + private final String shardName; + private final ActorRef actor; + private final ActorPath actorPath; + private final Map peerAddresses; + + private ShardInformation(String shardName, ActorRef actor, + Map peerAddresses) { + this.shardName = shardName; + this.actor = actor; + this.actorPath = actor.path(); + this.peerAddresses = peerAddresses; + } + + public String getShardName() { + return shardName; + } + + public ActorRef getActor(){ + return actor; + } + + public ActorPath getActorPath() { + return actorPath; + } + + public Map getPeerAddresses() { + return peerAddresses; + } + + public void updatePeerAddress(String peerId, String peerAddress){ + LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress); + if(peerAddresses.containsKey(peerId)){ + peerAddresses.put(peerId, peerAddress); + + LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path()); + + actor + .tell(new PeerAddressResolved(peerId, peerAddress), + getSelf()); + + } } } }