2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
22 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
23 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
24 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
25 import scala.concurrent.duration.Duration;
27 import java.util.HashMap;
28 import java.util.List;
32 * The ShardManager has the following jobs,
34 * <li> Create all the local shard replicas that belong on this cluster member
35 * <li> Find the primary replica for any given shard
36 * <li> Engage in shard replica elections which decide which replica should be the primary
39 * <h3>>Creation of Shard replicas</h3
41 * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
42 * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
45 * <h3> Replica Elections </h3>
48 * The Shard Manager uses multiple cues to initiate election.
49 * <li> When a member of the cluster dies
50 * <li> When a local shard replica dies
51 * <li> When a local shard replica comes alive
54 public class ShardManager extends AbstractUntypedActor {
56 // Stores a mapping between a member name and the address of the member
57 private final Map<String, Address> memberNameToAddress = new HashMap<>();
59 private final Map<String, ActorPath> localShards = new HashMap<>();
62 private final String type;
64 private final ClusterWrapper cluster;
66 private final Configuration configuration;
69 * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
70 * configuration or operational
72 private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
74 this.type = Preconditions.checkNotNull(type, "type should not be null");
75 this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
76 this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
78 // Subscribe this actor to cluster member events
79 cluster.subscribeToMemberEvents(getSelf());
81 // Create all the local Shards and make them a child of the ShardManager
82 // TODO: This may need to be initiated when we first get the schema context
86 public static Props props(final String type,
87 final ClusterWrapper cluster,
88 final Configuration configuration) {
89 return Props.create(new Creator<ShardManager>() {
92 public ShardManager create() throws Exception {
93 return new ShardManager(type, cluster, configuration);
100 public void handleReceive(Object message) throws Exception {
101 if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
103 FindPrimary.fromSerializable(message));
105 } else if (message instanceof UpdateSchemaContext) {
106 updateSchemaContext(message);
107 } else if (message instanceof ClusterEvent.MemberUp){
108 memberUp((ClusterEvent.MemberUp) message);
109 } else if(message instanceof ClusterEvent.MemberRemoved) {
110 memberRemoved((ClusterEvent.MemberRemoved) message);
111 } else if(message instanceof ClusterEvent.UnreachableMember) {
112 ignoreMessage(message);
114 throw new Exception ("Not recognized message received, message="+message);
119 private void ignoreMessage(Object message){
120 LOG.debug("Unhandled message : " + message);
123 private void memberRemoved(ClusterEvent.MemberRemoved message) {
124 memberNameToAddress.remove(message.member().roles().head());
127 private void memberUp(ClusterEvent.MemberUp message) {
128 memberNameToAddress.put(message.member().roles().head(), message.member().address());
131 private void updateSchemaContext(Object message) {
132 for(ActorPath path : localShards.values()){
133 getContext().system().actorSelection(path)
139 private void findPrimary(FindPrimary message) {
140 String shardName = message.getShardName();
142 List<String> members =
143 configuration.getMembersFromShardName(shardName);
145 for(String memberName : members) {
146 if (memberName.equals(cluster.getCurrentMemberName())) {
147 // This is a local shard
148 ActorPath shardPath = localShards.get(shardName);
149 if (shardPath == null) {
151 .tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
154 getSender().tell(new PrimaryFound(shardPath.toString()).toSerializable(),
158 Address address = memberNameToAddress.get(memberName);
161 address.toString() + "/user/shardmanager-" + this.type + "/" + getShardActorName(
162 memberName, shardName);
163 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
171 getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
174 private String getShardActorName(String memberName, String shardName){
175 return memberName + "-shard-" + shardName + "-" + this.type;
178 // Create the shards that are local to this member
179 private void createLocalShards() {
180 String memberName = this.cluster.getCurrentMemberName();
181 List<String> memberShardNames =
182 this.configuration.getMemberShardNames(memberName);
184 for(String shardName : memberShardNames){
185 String shardActorName = getShardActorName(memberName, shardName);
186 ActorRef actor = getContext()
187 .actorOf(Shard.props(shardActorName), shardActorName);
188 ActorPath path = actor.path();
189 localShards.put(shardName, path);
196 public SupervisorStrategy supervisorStrategy() {
197 return new OneForOneStrategy(10, Duration.create("1 minute"),
198 new Function<Throwable, SupervisorStrategy.Directive>() {
200 public SupervisorStrategy.Directive apply(Throwable t) {
201 return SupervisorStrategy.resume();