2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.Props;
15 import akka.event.Logging;
16 import akka.event.LoggingAdapter;
17 import akka.japi.Creator;
18 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
19 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
20 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
21 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
23 import java.util.HashMap;
24 import java.util.List;
28 * The ShardManager has the following jobs,
30 * <li> Create all the local shard replicas that belong on this cluster member
31 * <li> Find the primary replica for any given shard
32 * <li> Engage in shard replica elections which decide which replica should be the primary
35 * <h3>>Creation of Shard replicas</h3
37 * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
38 * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
41 * <h3> Replica Elections </h3>
44 * The Shard Manager uses multiple cues to initiate election.
45 * <li> When a member of the cluster dies
46 * <li> When a local shard replica dies
47 * <li> When a local shard replica comes alive
50 public class ShardManager extends AbstractUntypedActor {
52 // Stores a mapping between a shard name and the address of the current primary
53 private final Map<String, Address> shardNameToPrimaryAddress =
56 // Stores a mapping between a member name and the address of the member
57 private final Map<String, Address> memberNameToAddress = new HashMap<>();
59 // Stores a mapping between the shard name and all the members on which a replica of that shard are available
60 private final Map<String, List<String>> shardNameToMembers =
63 private final LoggingAdapter log =
64 Logging.getLogger(getContext().system(), this);
67 private final Map<String, ActorPath> localShards = new HashMap<>();
70 private final String type;
72 private final ClusterWrapper cluster;
74 private final Configuration configuration;
77 * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
78 * configuration or operational
80 private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
82 this.cluster = cluster;
83 this.configuration = configuration;
84 String memberName = cluster.getCurrentMemberName();
85 List<String> memberShardNames =
86 configuration.getMemberShardNames(memberName);
88 for(String shardName : memberShardNames){
89 String shardActorName = getShardActorName(memberName, shardName);
90 ActorRef actor = getContext()
91 .actorOf(Shard.props(shardActorName), shardActorName);
92 ActorPath path = actor.path();
93 localShards.put(shardName, path);
97 public static Props props(final String type,
98 final ClusterWrapper cluster,
99 final Configuration configuration) {
100 return Props.create(new Creator<ShardManager>() {
103 public ShardManager create() throws Exception {
104 return new ShardManager(type, cluster, configuration);
110 public void handleReceive(Object message) throws Exception {
111 if (message instanceof FindPrimary) {
112 FindPrimary msg = ((FindPrimary) message);
113 String shardName = msg.getShardName();
115 List<String> members =
116 configuration.getMembersFromShardName(shardName);
118 for(String memberName : members) {
119 if (memberName.equals(cluster.getCurrentMemberName())) {
120 // This is a local shard
121 ActorPath shardPath = localShards.get(shardName);
122 // FIXME: This check may be redundant
123 if (shardPath == null) {
125 .tell(new PrimaryNotFound(shardName), getSelf());
128 getSender().tell(new PrimaryFound(shardPath.toString()),
132 Address address = memberNameToAddress.get(shardName);
135 address.toString() + "/user/" + getShardActorName(
136 memberName, shardName);
137 getSender().tell(new PrimaryFound(path), getSelf());
144 getSender().tell(new PrimaryNotFound(shardName), getSelf());
146 } else if (message instanceof UpdateSchemaContext) {
147 for(ActorPath path : localShards.values()){
148 getContext().system().actorSelection(path)
155 private String getShardActorName(String memberName, String shardName){
156 return memberName + "-shard-" + shardName + "-" + this.type;