2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
22 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
23 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
24 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
25 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
26 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
27 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
28 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
29 import scala.concurrent.duration.Duration;
31 import java.util.HashMap;
32 import java.util.List;
36 * The ShardManager has the following jobs,
38 * <li> Create all the local shard replicas that belong on this cluster member
39 * <li> Find the address of the local shard
40 * <li> Find the primary replica for any given shard
41 * <li> Monitor the cluster members and store their addresses
44 public class ShardManager extends AbstractUntypedActor {
46 // Stores a mapping between a member name and the address of the member
47 // Member names look like "member-1", "member-2" etc and are as specified
49 private final Map<String, Address> memberNameToAddress = new HashMap<>();
51 // Stores a mapping between a shard name and it's corresponding information
52 // Shard names look like inventory, topology etc and are as specified in
54 private final Map<String, ShardInformation> localShards = new HashMap<>();
56 // The type of a ShardManager reflects the type of the datastore itself
57 // A data store could be of type config/operational
58 private final String type;
60 private final ClusterWrapper cluster;
62 private final Configuration configuration;
65 * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
66 * configuration or operational
68 private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
70 this.type = Preconditions.checkNotNull(type, "type should not be null");
71 this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
72 this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
74 // Subscribe this actor to cluster member events
75 cluster.subscribeToMemberEvents(getSelf());
77 // Create all the local Shards and make them a child of the ShardManager
78 // TODO: This may need to be initiated when we first get the schema context
82 public static Props props(final String type,
83 final ClusterWrapper cluster,
84 final Configuration configuration) {
85 return Props.create(new Creator<ShardManager>() {
88 public ShardManager create() throws Exception {
89 return new ShardManager(type, cluster, configuration);
96 public void handleReceive(Object message) throws Exception {
97 if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
99 FindPrimary.fromSerializable(message));
100 } else if(message instanceof FindLocalShard){
101 findLocalShard((FindLocalShard) message);
102 } else if (message instanceof UpdateSchemaContext) {
103 updateSchemaContext(message);
104 } else if (message instanceof ClusterEvent.MemberUp){
105 memberUp((ClusterEvent.MemberUp) message);
106 } else if(message instanceof ClusterEvent.MemberRemoved) {
107 memberRemoved((ClusterEvent.MemberRemoved) message);
108 } else if(message instanceof ClusterEvent.UnreachableMember) {
109 ignoreMessage(message);
111 throw new Exception ("Not recognized message received, message="+message);
116 private void findLocalShard(FindLocalShard message) {
117 ShardInformation shardInformation =
118 localShards.get(message.getShardName());
120 if(shardInformation != null){
121 getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
125 getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
128 private void ignoreMessage(Object message){
129 LOG.debug("Unhandled message : " + message);
132 private void memberRemoved(ClusterEvent.MemberRemoved message) {
133 memberNameToAddress.remove(message.member().roles().head());
136 private void memberUp(ClusterEvent.MemberUp message) {
137 String memberName = message.member().roles().head();
139 memberNameToAddress.put(memberName , message.member().address());
141 for(ShardInformation info : localShards.values()){
142 String shardName = info.getShardName();
143 info.updatePeerAddress(getShardActorName(memberName, shardName),
144 getShardActorPath(shardName, memberName));
149 * Notifies all the local shards of a change in the schema context
153 private void updateSchemaContext(Object message) {
154 for(ShardInformation info : localShards.values()){
155 info.getActor().tell(message,getSelf());
159 private void findPrimary(FindPrimary message) {
160 String shardName = message.getShardName();
162 List<String> members =
163 configuration.getMembersFromShardName(shardName);
165 // First see if the there is a local replica for the shard
166 ShardInformation info = localShards.get(shardName);
168 ActorPath shardPath = info.getActorPath();
169 if (shardPath != null) {
172 new PrimaryFound(shardPath.toString()).toSerializable(),
178 if(cluster.getCurrentMemberName() != null) {
179 members.remove(cluster.getCurrentMemberName());
182 // There is no way for us to figure out the primary (for now) so assume
183 // that one of the remote nodes is a primary
184 for(String memberName : members) {
185 Address address = memberNameToAddress.get(memberName);
188 getShardActorPath(shardName, memberName);
189 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
193 getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
196 private String getShardActorPath(String shardName, String memberName) {
197 Address address = memberNameToAddress.get(memberName);
198 if(address != null) {
199 return address.toString() + "/user/shardmanager-" + this.type + "/"
201 memberName, shardName);
207 * Construct the name of the shard actor given the name of the member on
208 * which the shard resides and the name of the shard
214 private String getShardActorName(String memberName, String shardName){
215 return memberName + "-shard-" + shardName + "-" + this.type;
219 * Create shards that are local to the member on which the ShardManager
223 private void createLocalShards() {
224 String memberName = this.cluster.getCurrentMemberName();
225 List<String> memberShardNames =
226 this.configuration.getMemberShardNames(memberName);
228 for(String shardName : memberShardNames){
229 String shardActorName = getShardActorName(memberName, shardName);
230 Map<String, String> peerAddresses = getPeerAddresses(shardName);
231 ActorRef actor = getContext()
232 .actorOf(Shard.props(shardActorName, peerAddresses),
234 localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
240 * Given the name of the shard find the addresses of all it's peers
245 private Map<String, String> getPeerAddresses(String shardName){
247 Map<String, String> peerAddresses = new HashMap<>();
249 List<String> members =
250 this.configuration.getMembersFromShardName(shardName);
252 String currentMemberName = this.cluster.getCurrentMemberName();
254 for(String memberName : members){
255 if(!currentMemberName.equals(memberName)){
256 String shardActorName = getShardActorName(memberName, shardName);
258 getShardActorPath(shardName, currentMemberName);
259 peerAddresses.put(shardActorName, path);
262 return peerAddresses;
267 public SupervisorStrategy supervisorStrategy() {
268 return new OneForOneStrategy(10, Duration.create("1 minute"),
269 new Function<Throwable, SupervisorStrategy.Directive>() {
271 public SupervisorStrategy.Directive apply(Throwable t) {
272 return SupervisorStrategy.resume();
279 private class ShardInformation {
280 private final String shardName;
281 private final ActorRef actor;
282 private final ActorPath actorPath;
283 private final Map<String, String> peerAddresses;
285 private ShardInformation(String shardName, ActorRef actor,
286 Map<String, String> peerAddresses) {
287 this.shardName = shardName;
289 this.actorPath = actor.path();
290 this.peerAddresses = peerAddresses;
293 public String getShardName() {
297 public ActorRef getActor(){
301 public ActorPath getActorPath() {
305 public Map<String, String> getPeerAddresses() {
306 return peerAddresses;
309 public void updatePeerAddress(String peerId, String peerAddress){
310 LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
311 if(peerAddresses.containsKey(peerId)){
312 peerAddresses.put(peerId, peerAddress);
314 LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path());
317 .tell(new PeerAddressResolved(peerId, peerAddress),