Merge "Added hosttracker shell for karaf (rebased)"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
22 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
23 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
24 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
25 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
26 import scala.concurrent.duration.Duration;
27
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31
32 /**
33  * The ShardManager has the following jobs,
34  * <p>
35  * <li> Create all the local shard replicas that belong on this cluster member
36  * <li> Find the primary replica for any given shard
37  * <li> Engage in shard replica elections which decide which replica should be the primary
38  * </p>
39  * <p/>
40  * <h3>>Creation of Shard replicas</h3
41  * <p>
42  * When the ShardManager is constructed it reads the cluster configuration to find out which shard replicas
43  * belong on this member. It finds out the name of the current cluster member from the Akka Clustering Service.
44  * </p>
45  * <p/>
46  * <h3> Replica Elections </h3>
47  * <p/>
48  * <p>
49  * The Shard Manager uses multiple cues to initiate election.
50  * <li> When a member of the cluster dies
51  * <li> When a local shard replica dies
52  * <li> When a local shard replica comes alive
53  * </p>
54  */
55 public class ShardManager extends AbstractUntypedActor {
56
57     // Stores a mapping between a member name and the address of the member
58     private final Map<String, Address> memberNameToAddress = new HashMap<>();
59
60     private final Map<String, ShardInformation> localShards = new HashMap<>();
61
62
63     private final String type;
64
65     private final ClusterWrapper cluster;
66
67     private final Configuration configuration;
68
69     /**
70      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
71      *             configuration or operational
72      */
73     private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
74
75         this.type = Preconditions.checkNotNull(type, "type should not be null");
76         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
77         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
78
79         // Subscribe this actor to cluster member events
80         cluster.subscribeToMemberEvents(getSelf());
81
82         // Create all the local Shards and make them a child of the ShardManager
83         // TODO: This may need to be initiated when we first get the schema context
84         createLocalShards();
85     }
86
87     public static Props props(final String type,
88         final ClusterWrapper cluster,
89         final Configuration configuration) {
90         return Props.create(new Creator<ShardManager>() {
91
92             @Override
93             public ShardManager create() throws Exception {
94                 return new ShardManager(type, cluster, configuration);
95             }
96         });
97     }
98
99
100     @Override
101     public void handleReceive(Object message) throws Exception {
102         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
103             findPrimary(
104                 FindPrimary.fromSerializable(message));
105
106         } else if (message instanceof UpdateSchemaContext) {
107             updateSchemaContext(message);
108         } else if (message instanceof ClusterEvent.MemberUp){
109             memberUp((ClusterEvent.MemberUp) message);
110         } else if(message instanceof ClusterEvent.MemberRemoved) {
111             memberRemoved((ClusterEvent.MemberRemoved) message);
112         } else if(message instanceof ClusterEvent.UnreachableMember) {
113             ignoreMessage(message);
114         } else{
115           throw new Exception ("Not recognized message received, message="+message);
116         }
117
118     }
119
120     private void ignoreMessage(Object message){
121         LOG.debug("Unhandled message : " + message);
122     }
123
124     private void memberRemoved(ClusterEvent.MemberRemoved message) {
125         memberNameToAddress.remove(message.member().roles().head());
126     }
127
128     private void memberUp(ClusterEvent.MemberUp message) {
129         String memberName = message.member().roles().head();
130
131         memberNameToAddress.put(memberName , message.member().address());
132
133         for(ShardInformation info : localShards.values()){
134             String shardName = info.getShardName();
135             info.updatePeerAddress(getShardActorName(memberName, shardName),
136                 getShardActorPath(shardName, memberName));
137         }
138     }
139
140     private void updateSchemaContext(Object message) {
141         for(ShardInformation info : localShards.values()){
142             info.getActor().tell(message,getSelf());
143         }
144     }
145
146     private void findPrimary(FindPrimary message) {
147         String shardName = message.getShardName();
148
149         List<String> members =
150             configuration.getMembersFromShardName(shardName);
151
152         // First see if the there is a local replica for the shard
153         ShardInformation info = localShards.get(shardName);
154         if(info != null) {
155             ActorPath shardPath = info.getActorPath();
156             if (shardPath != null) {
157                 getSender()
158                     .tell(
159                         new PrimaryFound(shardPath.toString()).toSerializable(),
160                         getSelf());
161                 return;
162             }
163         }
164
165         if(cluster.getCurrentMemberName() != null) {
166             members.remove(cluster.getCurrentMemberName());
167         }
168
169         // There is no way for us to figure out the primary (for now) so assume
170         // that one of the remote nodes is a primary
171         for(String memberName : members) {
172             Address address = memberNameToAddress.get(memberName);
173             if(address != null){
174                 String path =
175                     getShardActorPath(shardName, memberName);
176                 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
177                 return;
178             }
179         }
180         getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
181     }
182
183     private String
184
185
186     getShardActorPath(String shardName, String memberName) {
187         Address address = memberNameToAddress.get(memberName);
188         if(address != null) {
189             return address.toString() + "/user/shardmanager-" + this.type + "/"
190                 + getShardActorName(
191                 memberName, shardName);
192         }
193         return null;
194     }
195
196     private String getShardActorName(String memberName, String shardName){
197         return memberName + "-shard-" + shardName + "-" + this.type;
198     }
199
200     // Create the shards that are local to this member
201     private void createLocalShards() {
202         String memberName = this.cluster.getCurrentMemberName();
203         List<String> memberShardNames =
204             this.configuration.getMemberShardNames(memberName);
205
206         for(String shardName : memberShardNames){
207             String shardActorName = getShardActorName(memberName, shardName);
208             Map<String, String> peerAddresses = getPeerAddresses(shardName);
209             ActorRef actor = getContext()
210                 .actorOf(Shard.props(shardActorName, peerAddresses),
211                     shardActorName);
212             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
213         }
214
215     }
216
217     private Map<String, String> getPeerAddresses(String shardName){
218
219         Map<String, String> peerAddresses = new HashMap<>();
220
221         List<String> members =
222             this.configuration.getMembersFromShardName(shardName);
223
224         String currentMemberName = this.cluster.getCurrentMemberName();
225
226         for(String memberName : members){
227             if(!currentMemberName.equals(memberName)){
228                 String shardActorName = getShardActorName(memberName, shardName);
229                 String path =
230                     getShardActorPath(shardName, currentMemberName);
231                 peerAddresses.put(shardActorName, path);
232             }
233         }
234         return peerAddresses;
235     }
236
237
238     @Override
239     public SupervisorStrategy supervisorStrategy() {
240         return new OneForOneStrategy(10, Duration.create("1 minute"),
241             new Function<Throwable, SupervisorStrategy.Directive>() {
242                 @Override
243                 public SupervisorStrategy.Directive apply(Throwable t) {
244                     return SupervisorStrategy.resume();
245                 }
246             }
247         );
248
249     }
250
251     private class ShardInformation {
252         private final String shardName;
253         private final ActorRef actor;
254         private final ActorPath actorPath;
255         private final Map<String, String> peerAddresses;
256
257         private ShardInformation(String shardName, ActorRef actor,
258             Map<String, String> peerAddresses) {
259             this.shardName = shardName;
260             this.actor = actor;
261             this.actorPath = actor.path();
262             this.peerAddresses = peerAddresses;
263         }
264
265         public String getShardName() {
266             return shardName;
267         }
268
269         public ActorRef getActor(){
270             return actor;
271         }
272
273         public ActorPath getActorPath() {
274             return actorPath;
275         }
276
277         public Map<String, String> getPeerAddresses() {
278             return peerAddresses;
279         }
280
281         public void updatePeerAddress(String peerId, String peerAddress){
282             LOG.info("updatePeerAddress for peer {} with address {}", peerId, peerAddress);
283             if(peerAddresses.containsKey(peerId)){
284                 peerAddresses.put(peerId, peerAddress);
285
286                 LOG.info("Sending PeerAddressResolved for peer {} with address {} to {}", peerId, peerAddress, actor.path());
287
288                 actor
289                     .tell(new PeerAddressResolved(peerId, peerAddress),
290                         getSelf());
291
292             }
293         }
294     }
295 }