Optimizations, Monitoring and Logging
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
23 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
24 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
25 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
26 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
27 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
28 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
29 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
30 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
31 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
32 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
33 import scala.concurrent.duration.Duration;
34
35 import java.util.ArrayList;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39
40 /**
41  * The ShardManager has the following jobs,
42  * <ul>
43  * <li> Create all the local shard replicas that belong on this cluster member
44  * <li> Find the address of the local shard
45  * <li> Find the primary replica for any given shard
46  * <li> Monitor the cluster members and store their addresses
47  * <ul>
48  */
49 public class ShardManager extends AbstractUntypedActor {
50
51     // Stores a mapping between a member name and the address of the member
52     // Member names look like "member-1", "member-2" etc and are as specified
53     // in configuration
54     private final Map<String, Address> memberNameToAddress = new HashMap<>();
55
56     // Stores a mapping between a shard name and it's corresponding information
57     // Shard names look like inventory, topology etc and are as specified in
58     // configuration
59     private final Map<String, ShardInformation> localShards = new HashMap<>();
60
61     // The type of a ShardManager reflects the type of the datastore itself
62     // A data store could be of type config/operational
63     private final String type;
64
65     private final ClusterWrapper cluster;
66
67     private final Configuration configuration;
68
69     private ShardManagerInfoMBean mBean;
70
71     /**
72      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
73      *             configuration or operational
74      */
75     private ShardManager(String type, ClusterWrapper cluster, Configuration configuration) {
76
77         this.type = Preconditions.checkNotNull(type, "type should not be null");
78         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
79         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
80
81         // Subscribe this actor to cluster member events
82         cluster.subscribeToMemberEvents(getSelf());
83
84         // Create all the local Shards and make them a child of the ShardManager
85         // TODO: This may need to be initiated when we first get the schema context
86         createLocalShards();
87     }
88
89     public static Props props(final String type,
90         final ClusterWrapper cluster,
91         final Configuration configuration) {
92
93         Preconditions.checkNotNull(type, "type should not be null");
94         Preconditions.checkNotNull(cluster, "cluster should not be null");
95         Preconditions.checkNotNull(configuration, "configuration should not be null");
96
97         return Props.create(new Creator<ShardManager>() {
98
99             @Override
100             public ShardManager create() throws Exception {
101                 return new ShardManager(type, cluster, configuration);
102             }
103         });
104     }
105
106
107     @Override
108     public void handleReceive(Object message) throws Exception {
109         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
110             findPrimary(
111                 FindPrimary.fromSerializable(message));
112         } else if(message instanceof FindLocalShard){
113             findLocalShard((FindLocalShard) message);
114         } else if (message instanceof UpdateSchemaContext) {
115             updateSchemaContext(message);
116         } else if (message instanceof ClusterEvent.MemberUp){
117             memberUp((ClusterEvent.MemberUp) message);
118         } else if(message instanceof ClusterEvent.MemberRemoved) {
119             memberRemoved((ClusterEvent.MemberRemoved) message);
120         } else if(message instanceof ClusterEvent.UnreachableMember) {
121             ignoreMessage(message);
122         } else{
123             unknownMessage(message);
124         }
125
126     }
127
128     private void findLocalShard(FindLocalShard message) {
129         ShardInformation shardInformation =
130             localShards.get(message.getShardName());
131
132         if(shardInformation != null){
133             getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
134             return;
135         }
136
137         getSender().tell(new LocalShardNotFound(message.getShardName()),
138             getSelf());
139     }
140
141     private void memberRemoved(ClusterEvent.MemberRemoved message) {
142         memberNameToAddress.remove(message.member().roles().head());
143     }
144
145     private void memberUp(ClusterEvent.MemberUp message) {
146         String memberName = message.member().roles().head();
147
148         memberNameToAddress.put(memberName , message.member().address());
149
150         for(ShardInformation info : localShards.values()){
151             String shardName = info.getShardName();
152             info.updatePeerAddress(getShardIdentifier(memberName, shardName),
153                 getShardActorPath(shardName, memberName));
154         }
155     }
156
157     /**
158      * Notifies all the local shards of a change in the schema context
159      *
160      * @param message
161      */
162     private void updateSchemaContext(Object message) {
163         for(ShardInformation info : localShards.values()){
164             info.getActor().tell(message,getSelf());
165         }
166     }
167
168     private void findPrimary(FindPrimary message) {
169         String shardName = message.getShardName();
170
171         // First see if the there is a local replica for the shard
172         ShardInformation info = localShards.get(shardName);
173         if(info != null) {
174             ActorPath shardPath = info.getActorPath();
175             if (shardPath != null) {
176                 getSender()
177                     .tell(
178                         new PrimaryFound(shardPath.toString()).toSerializable(),
179                         getSelf());
180                 return;
181             }
182         }
183
184         List<String> members =
185             configuration.getMembersFromShardName(shardName);
186
187         if(cluster.getCurrentMemberName() != null) {
188             members.remove(cluster.getCurrentMemberName());
189         }
190
191         // There is no way for us to figure out the primary (for now) so assume
192         // that one of the remote nodes is a primary
193         for(String memberName : members) {
194             Address address = memberNameToAddress.get(memberName);
195             if(address != null){
196                 String path =
197                     getShardActorPath(shardName, memberName);
198                 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
199                 return;
200             }
201         }
202         getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
203     }
204
205     private String getShardActorPath(String shardName, String memberName) {
206         Address address = memberNameToAddress.get(memberName);
207         if(address != null) {
208             StringBuilder builder = new StringBuilder();
209             builder.append(address.toString())
210                 .append("/user/")
211                 .append(ShardManagerIdentifier.builder().type(type).build().toString())
212                 .append("/")
213                 .append(getShardIdentifier(memberName, shardName));
214             return builder.toString();
215         }
216         return null;
217     }
218
219     /**
220      * Construct the name of the shard actor given the name of the member on
221      * which the shard resides and the name of the shard
222      *
223      * @param memberName
224      * @param shardName
225      * @return
226      */
227     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
228         return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
229     }
230
231     /**
232      * Create shards that are local to the member on which the ShardManager
233      * runs
234      *
235      */
236     private void createLocalShards() {
237         String memberName = this.cluster.getCurrentMemberName();
238         List<String> memberShardNames =
239             this.configuration.getMemberShardNames(memberName);
240
241         List<String> localShardActorNames = new ArrayList<>();
242         for(String shardName : memberShardNames){
243             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
244             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
245             ActorRef actor = getContext()
246                 .actorOf(Shard.props(shardId, peerAddresses),
247                     shardId.toString());
248             localShardActorNames.add(shardId.toString());
249             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
250         }
251
252         mBean = ShardManagerInfo
253             .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
254
255     }
256
257     /**
258      * Given the name of the shard find the addresses of all it's peers
259      *
260      * @param shardName
261      * @return
262      */
263     private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
264
265         Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
266
267         List<String> members =
268             this.configuration.getMembersFromShardName(shardName);
269
270         String currentMemberName = this.cluster.getCurrentMemberName();
271
272         for(String memberName : members){
273             if(!currentMemberName.equals(memberName)){
274                 ShardIdentifier shardId = getShardIdentifier(memberName,
275                     shardName);
276                 String path =
277                     getShardActorPath(shardName, currentMemberName);
278                 peerAddresses.put(shardId, path);
279             }
280         }
281         return peerAddresses;
282     }
283
284     @Override
285     public SupervisorStrategy supervisorStrategy() {
286         return new OneForOneStrategy(10, Duration.create("1 minute"),
287             new Function<Throwable, SupervisorStrategy.Directive>() {
288                 @Override
289                 public SupervisorStrategy.Directive apply(Throwable t) {
290                     return SupervisorStrategy.resume();
291                 }
292             }
293         );
294
295     }
296
297     private class ShardInformation {
298         private final String shardName;
299         private final ActorRef actor;
300         private final ActorPath actorPath;
301         private final Map<ShardIdentifier, String> peerAddresses;
302
303         private ShardInformation(String shardName, ActorRef actor,
304             Map<ShardIdentifier, String> peerAddresses) {
305             this.shardName = shardName;
306             this.actor = actor;
307             this.actorPath = actor.path();
308             this.peerAddresses = peerAddresses;
309         }
310
311         public String getShardName() {
312             return shardName;
313         }
314
315         public ActorRef getActor(){
316             return actor;
317         }
318
319         public ActorPath getActorPath() {
320             return actorPath;
321         }
322
323         public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
324             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
325                 peerAddress);
326             if(peerAddresses.containsKey(peerId)){
327                 peerAddresses.put(peerId, peerAddress);
328
329                 LOG.debug(
330                     "Sending PeerAddressResolved for peer {} with address {} to {}",
331                     peerId, peerAddress, actor.path());
332
333                 actor
334                     .tell(new PeerAddressResolved(peerId, peerAddress),
335                         getSelf());
336
337             }
338         }
339     }
340 }
341
342
343