2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.datastore;
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
22 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
23 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
24 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
25 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
26 import scala.concurrent.duration.Duration;
28 import java.util.HashMap;
29 import java.util.List;
33 * The ShardManager has the following jobs,
35 * <li> Create all the local shard replicas that belong on this cluster member
36 * <li> Find the primary replica for any given shard
37 * <li> Engage in shard replica elections which decide which replica should be the primary
40 * <h3>>Creation of Shard replicas</h3
42 * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
43 * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
46 * <h3> Replica Elections </h3>
49 * The Shard Manager uses multiple cues to initiate election.
50 * <li> When a member of the cluster dies
51 * <li> When a local shard replica dies
52 * <li> When a local shard replica comes alive
55 public class ShardManager extends AbstractUntypedActor {
57 // Stores a mapping between a member name and the address of the member
58 private final Map<String, Address> memberNameToAddress = new HashMap<>();
60 private final Map<String, ShardInformation> localShards = new HashMap<>();
63 private final String type;
65 private final ClusterWrapper cluster;
67 private final Configuration configuration;
70 * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
71 * configuration or operational
73 private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
75 this.type = Preconditions.checkNotNull(type, "type should not be null");
76 this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
77 this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
79 // Subscribe this actor to cluster member events
80 cluster.subscribeToMemberEvents(getSelf());
82 // Create all the local Shards and make them a child of the ShardManager
83 // TODO: This may need to be initiated when we first get the schema context
87 public static Props props(final String type,
88 final ClusterWrapper cluster,
89 final Configuration configuration) {
90 return Props.create(new Creator<ShardManager>() {
93 public ShardManager create() throws Exception {
94 return new ShardManager(type, cluster, configuration);
101 public void handleReceive(Object message) throws Exception {
102 if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
104 FindPrimary.fromSerializable(message));
106 } else if (message instanceof UpdateSchemaContext) {
107 updateSchemaContext(message);
108 } else if (message instanceof ClusterEvent.MemberUp){
109 memberUp((ClusterEvent.MemberUp) message);
110 } else if(message instanceof ClusterEvent.MemberRemoved) {
111 memberRemoved((ClusterEvent.MemberRemoved) message);
112 } else if(message instanceof ClusterEvent.UnreachableMember) {
113 ignoreMessage(message);
115 throw new Exception ("Not recognized message received, message="+message);
120 private void ignoreMessage(Object message){
121 LOG.debug("Unhandled message : " + message);
124 private void memberRemoved(ClusterEvent.MemberRemoved message) {
125 memberNameToAddress.remove(message.member().roles().head());
128 private void memberUp(ClusterEvent.MemberUp message) {
129 String memberName = message.member().roles().head();
131 memberNameToAddress.put(memberName , message.member().address());
133 for(ShardInformation info : localShards.values()){
134 String shardName = info.getShardName();
135 info.updatePeerAddress(getShardActorName(memberName, shardName),
136 getShardActorPath(shardName, memberName));
140 private void updateSchemaContext(Object message) {
141 for(ShardInformation info : localShards.values()){
142 info.getActor().tell(message,getSelf());
146 private void findPrimary(FindPrimary message) {
147 String shardName = message.getShardName();
149 List<String> members =
150 configuration.getMembersFromShardName(shardName);
152 // First see if the there is a local replica for the shard
153 ShardInformation info = localShards.get(shardName);
155 ActorPath shardPath = info.getActorPath();
156 if (shardPath != null) {
159 new PrimaryFound(shardPath.toString()).toSerializable(),
165 if(cluster.getCurrentMemberName() != null) {
166 members.remove(cluster.getCurrentMemberName());
169 // There is no way for us to figure out the primary (for now) so assume
170 // that one of the remote nodes is a primary
171 for(String memberName : members) {
172 Address address = memberNameToAddress.get(memberName);
175 getShardActorPath(shardName, memberName);
176 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
180 getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
186 getShardActorPath(String shardName, String memberName) {
187 Address address = memberNameToAddress.get(memberName);
188 if(address != null) {
189 return address.toString() + "/user/shardmanager-" + this.type + "/"
191 memberName, shardName);
196 private String getShardActorName(String memberName, String shardName){
197 return memberName + "-shard-" + shardName + "-" + this.type;
200 // Create the shards that are local to this member
201 private void createLocalShards() {
202 String memberName = this.cluster.getCurrentMemberName();
203 List<String> memberShardNames =
204 this.configuration.getMemberShardNames(memberName);
206 for(String shardName : memberShardNames){
207 String shardActorName = getShardActorName(memberName, shardName);
208 Map<String, String> peerAddresses = getPeerAddresses(shardName);
209 ActorRef actor = getContext()
210 .actorOf(Shard.props(shardActorName, peerAddresses),
212 localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
217 private Map<String, String> getPeerAddresses(String shardName){
219 Map<String, String> peerAddresses = new HashMap<>();
221 List<String> members =
222 this.configuration.getMembersFromShardName(shardName);
224 String currentMemberName = this.cluster.getCurrentMemberName();
226 for(String memberName : members){
227 if(!currentMemberName.equals(memberName)){
228 String shardActorName = getShardActorName(memberName, shardName);
230 getShardActorPath(shardName, currentMemberName);
231 peerAddresses.put(shardActorName, path);
234 return peerAddresses;
239 public SupervisorStrategy supervisorStrategy() {
240 return new OneForOneStrategy(10, Duration.create("1 minute"),
241 new Function<Throwable, SupervisorStrategy.Directive>() {
243 public SupervisorStrategy.Directive apply(Throwable t) {
244 return SupervisorStrategy.resume();
251 private class ShardInformation {
252 private final String shardName;
253 private final ActorRef actor;
254 private final ActorPath actorPath;
255 private final Map<String, String> peerAddresses;
257 private ShardInformation(String shardName, ActorRef actor,
258 Map<String, String> peerAddresses) {
259 this.shardName = shardName;
261 this.actorPath = actor.path();
262 this.peerAddresses = peerAddresses;
265 public String getShardName() {
269 public ActorRef getActor(){
273 public ActorPath getActorPath() {
277 public Map<String, String> getPeerAddresses() {
278 return peerAddresses;
281 public void updatePeerAddress(String peerId, String peerAddress){
282 LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
283 if(peerAddresses.containsKey(peerId)){
284 peerAddresses.put(peerId, peerAddress);
286 LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path());
289 .tell(new PeerAddressResolved(peerId, peerAddress),