X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardManager.java;h=5fbce4cd98900be65053939ca7a51d4f7a929a98;hb=eb887b1c2c8cd2768f8b4c2ed2b5054f97798466;hp=63266d6308287d2e816724f3f73b192b0d120bce;hpb=c2fd5f62f3b80a7e7b4b7e167349ede433e785d6;p=controller.git
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
index 63266d6308..5fbce4cd98 100644
--- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
+++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
@@ -8,12 +8,22 @@
package org.opendaylight.controller.cluster.datastore;
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
import akka.actor.Address;
-import akka.actor.UntypedActor;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import akka.actor.OneForOneStrategy;
+import akka.actor.Props;
+import akka.actor.SupervisorStrategy;
+import akka.cluster.ClusterEvent;
+import akka.japi.Creator;
+import akka.japi.Function;
+import com.google.common.base.Preconditions;
import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import scala.concurrent.duration.Duration;
import java.util.HashMap;
import java.util.List;
@@ -21,41 +31,265 @@ import java.util.Map;
/**
* The ShardManager has the following jobs,
- *
- * - Create all the local shard replicas that belong on this cluster member
- * - Find the primary replica for any given shard
- * - Engage in shard replica elections which decide which replica should be the primary
- *
- * Creation of Shard replicas
- * ==========================
- * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
- * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
- *
- * Replica Elections
- * =================
- * The Shard Manager uses multiple cues to initiate election.
- * - When a member of the cluster dies
- * - When a local shard replica dies
- * - When a local shard replica comes alive
+ *
+ *
Create all the local shard replicas that belong on this cluster member
+ * Find the primary replica for any given shard
+ * Engage in shard replica elections which decide which replica should be the primary
+ *
+ *
+ * >Creation of Shard replicas
+ * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
+ * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
+ *
+ *
+ * Replica Elections
+ *
+ *
+ * The Shard Manager uses multiple cues to initiate election.
+ *
When a member of the cluster dies
+ * When a local shard replica dies
+ * When a local shard replica comes alive
+ *
*/
-public class ShardManager extends UntypedActor {
-
- // Stores a mapping between a shard name and the address of the current primary
- private final Map shardNameToPrimaryAddress = new HashMap<>();
+public class ShardManager extends AbstractUntypedActor {
// Stores a mapping between a member name and the address of the member
private final Map memberNameToAddress = new HashMap<>();
- // Stores a mapping between the shard name and all the members on which a replica of that shard are available
- private final Map> shardNameToMembers = new HashMap<>();
+ private final Map localShards = new HashMap<>();
+
+
+ private final String type;
+
+ private final ClusterWrapper cluster;
+
+ private final Configuration configuration;
+
+ /**
+ * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
+ * configuration or operational
+ */
+ private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
+
+ this.type = Preconditions.checkNotNull(type, "type should not be null");
+ this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
+ this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
+
+ // Subscribe this actor to cluster member events
+ cluster.subscribeToMemberEvents(getSelf());
+
+ // Create all the local Shards and make them a child of the ShardManager
+ // TODO: This may need to be initiated when we first get the schema context
+ createLocalShards();
+ }
+
+ public static Props props(final String type,
+ final ClusterWrapper cluster,
+ final Configuration configuration) {
+ return Props.create(new Creator() {
+
+ @Override
+ public ShardManager create() throws Exception {
+ return new ShardManager(type, cluster, configuration);
+ }
+ });
+ }
+
+
+ @Override
+ public void handleReceive(Object message) throws Exception {
+ if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
+ findPrimary(
+ FindPrimary.fromSerializable(message));
+
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext(message);
+ } else if (message instanceof ClusterEvent.MemberUp){
+ memberUp((ClusterEvent.MemberUp) message);
+ } else if(message instanceof ClusterEvent.MemberRemoved) {
+ memberRemoved((ClusterEvent.MemberRemoved) message);
+ } else if(message instanceof ClusterEvent.UnreachableMember) {
+ ignoreMessage(message);
+ } else{
+ throw new Exception ("Not recognized message received, message="+message);
+ }
+
+ }
+
+ private void ignoreMessage(Object message){
+ LOG.debug("Unhandled message : " + message);
+ }
+
+ private void memberRemoved(ClusterEvent.MemberRemoved message) {
+ memberNameToAddress.remove(message.member().roles().head());
+ }
+
+ private void memberUp(ClusterEvent.MemberUp message) {
+ String memberName = message.member().roles().head();
+
+ memberNameToAddress.put(memberName , message.member().address());
+
+ for(ShardInformation info : localShards.values()){
+ String shardName = info.getShardName();
+ info.updatePeerAddress(getShardActorName(memberName, shardName),
+ getShardActorPath(shardName, memberName));
+ }
+ }
+
+ private void updateSchemaContext(Object message) {
+ for(ShardInformation info : localShards.values()){
+ info.getActor().tell(message,getSelf());
+ }
+ }
+
+ private void findPrimary(FindPrimary message) {
+ String shardName = message.getShardName();
+
+ List members =
+ configuration.getMembersFromShardName(shardName);
+
+ // First see if the there is a local replica for the shard
+ ShardInformation info = localShards.get(shardName);
+ if(info != null) {
+ ActorPath shardPath = info.getActorPath();
+ if (shardPath != null) {
+ getSender()
+ .tell(
+ new PrimaryFound(shardPath.toString()).toSerializable(),
+ getSelf());
+ return;
+ }
+ }
+
+ if(cluster.getCurrentMemberName() != null) {
+ members.remove(cluster.getCurrentMemberName());
+ }
+
+ // There is no way for us to figure out the primary (for now) so assume
+ // that one of the remote nodes is a primary
+ for(String memberName : members) {
+ Address address = memberNameToAddress.get(memberName);
+ if(address != null){
+ String path =
+ getShardActorPath(shardName, memberName);
+ getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
+ return;
+ }
+ }
+ getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
+ }
+
+ private String
+
+
+ getShardActorPath(String shardName, String memberName) {
+ Address address = memberNameToAddress.get(memberName);
+ if(address != null) {
+ return address.toString() + "/user/shardmanager-" + this.type + "/"
+ + getShardActorName(
+ memberName, shardName);
+ }
+ return null;
+ }
+
+ private String getShardActorName(String memberName, String shardName){
+ return memberName + "-shard-" + shardName + "-" + this.type;
+ }
+
+ // Create the shards that are local to this member
+ private void createLocalShards() {
+ String memberName = this.cluster.getCurrentMemberName();
+ List memberShardNames =
+ this.configuration.getMemberShardNames(memberName);
+
+ for(String shardName : memberShardNames){
+ String shardActorName = getShardActorName(memberName, shardName);
+ Map peerAddresses = getPeerAddresses(shardName);
+ ActorRef actor = getContext()
+ .actorOf(Shard.props(shardActorName, peerAddresses),
+ shardActorName);
+ localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
+ }
+
+ }
+
+ private Map getPeerAddresses(String shardName){
+
+ Map peerAddresses = new HashMap<>();
+
+ List members =
+ this.configuration.getMembersFromShardName(shardName);
+
+ String currentMemberName = this.cluster.getCurrentMemberName();
+
+ for(String memberName : members){
+ if(!currentMemberName.equals(memberName)){
+ String shardActorName = getShardActorName(memberName, shardName);
+ String path =
+ getShardActorPath(shardName, currentMemberName);
+ peerAddresses.put(shardActorName, path);
+ }
+ }
+ return peerAddresses;
+ }
- LoggingAdapter log = Logging.getLogger(getContext().system(), this);
@Override
- public void onReceive(Object message) throws Exception {
- if(message instanceof FindPrimary ){
- FindPrimary msg = ((FindPrimary) message);
- getSender().tell(new PrimaryNotFound(msg.getShardName()), getSelf());
+ public SupervisorStrategy supervisorStrategy() {
+ return new OneForOneStrategy(10, Duration.create("1 minute"),
+ new Function() {
+ @Override
+ public SupervisorStrategy.Directive apply(Throwable t) {
+ return SupervisorStrategy.resume();
+ }
+ }
+ );
+
+ }
+
+ private class ShardInformation {
+ private final String shardName;
+ private final ActorRef actor;
+ private final ActorPath actorPath;
+ private final Map peerAddresses;
+
+ private ShardInformation(String shardName, ActorRef actor,
+ Map peerAddresses) {
+ this.shardName = shardName;
+ this.actor = actor;
+ this.actorPath = actor.path();
+ this.peerAddresses = peerAddresses;
+ }
+
+ public String getShardName() {
+ return shardName;
+ }
+
+ public ActorRef getActor(){
+ return actor;
+ }
+
+ public ActorPath getActorPath() {
+ return actorPath;
+ }
+
+ public Map getPeerAddresses() {
+ return peerAddresses;
+ }
+
+ public void updatePeerAddress(String peerId, String peerAddress){
+ LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
+ if(peerAddresses.containsKey(peerId)){
+ peerAddresses.put(peerId, peerAddress);
+
+ LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path());
+
+ actor
+ .tell(new PeerAddressResolved(peerId, peerAddress),
+ getSelf());
+
+ }
}
}
}