Merge "Custom mailbox that is bounded and instrumented."
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import com.google.common.base.Preconditions;
21 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
22 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
23 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
24 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
25 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
26 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
27 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
28 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
29 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
30 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
31 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
32 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
33 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
34
35 import scala.concurrent.duration.Duration;
36
37 import java.util.ArrayList;
38 import java.util.HashMap;
39 import java.util.List;
40 import java.util.Map;
41
42 /**
43  * The ShardManager has the following jobs,
44  * <ul>
45  * <li> Create all the local shard replicas that belong on this cluster member
46  * <li> Find the address of the local shard
47  * <li> Find the primary replica for any given shard
48  * <li> Monitor the cluster members and store their addresses
49  * <ul>
50  */
51 public class ShardManager extends AbstractUntypedActor {
52
53     // Stores a mapping between a member name and the address of the member
54     // Member names look like "member-1", "member-2" etc and are as specified
55     // in configuration
56     private final Map<String, Address> memberNameToAddress = new HashMap<>();
57
58     // Stores a mapping between a shard name and it's corresponding information
59     // Shard names look like inventory, topology etc and are as specified in
60     // configuration
61     private final Map<String, ShardInformation> localShards = new HashMap<>();
62
63     // The type of a ShardManager reflects the type of the datastore itself
64     // A data store could be of type config/operational
65     private final String type;
66
67     private final ClusterWrapper cluster;
68
69     private final Configuration configuration;
70
71     private ShardManagerInfoMBean mBean;
72
73     private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
74
75     /**
76      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
77      *             configuration or operational
78      */
79     private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
80             InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
81
82         this.type = Preconditions.checkNotNull(type, "type should not be null");
83         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
84         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
85         this.dataStoreProperties = dataStoreProperties;
86
87         // Subscribe this actor to cluster member events
88         cluster.subscribeToMemberEvents(getSelf());
89
90         // Create all the local Shards and make them a child of the ShardManager
91         // TODO: This may need to be initiated when we first get the schema context
92         createLocalShards();
93     }
94
95     public static Props props(final String type,
96         final ClusterWrapper cluster,
97         final Configuration configuration,
98         final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
99
100         Preconditions.checkNotNull(type, "type should not be null");
101         Preconditions.checkNotNull(cluster, "cluster should not be null");
102         Preconditions.checkNotNull(configuration, "configuration should not be null");
103
104         return Props.create(new Creator<ShardManager>() {
105
106             @Override
107             public ShardManager create() throws Exception {
108                 return new ShardManager(type, cluster, configuration, dataStoreProperties);
109             }
110         });
111     }
112
113
114     @Override
115     public void handleReceive(Object message) throws Exception {
116         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
117             findPrimary(
118                 FindPrimary.fromSerializable(message));
119         } else if(message instanceof FindLocalShard){
120             findLocalShard((FindLocalShard) message);
121         } else if (message instanceof UpdateSchemaContext) {
122             updateSchemaContext(message);
123         } else if (message instanceof ClusterEvent.MemberUp){
124             memberUp((ClusterEvent.MemberUp) message);
125         } else if(message instanceof ClusterEvent.MemberRemoved) {
126             memberRemoved((ClusterEvent.MemberRemoved) message);
127         } else if(message instanceof ClusterEvent.UnreachableMember) {
128             ignoreMessage(message);
129         } else{
130             unknownMessage(message);
131         }
132
133     }
134
135     private void findLocalShard(FindLocalShard message) {
136         ShardInformation shardInformation =
137             localShards.get(message.getShardName());
138
139         if(shardInformation != null){
140             getSender().tell(new LocalShardFound(shardInformation.getActor()), getSelf());
141             return;
142         }
143
144         getSender().tell(new LocalShardNotFound(message.getShardName()),
145             getSelf());
146     }
147
148     private void memberRemoved(ClusterEvent.MemberRemoved message) {
149         memberNameToAddress.remove(message.member().roles().head());
150     }
151
152     private void memberUp(ClusterEvent.MemberUp message) {
153         String memberName = message.member().roles().head();
154
155         memberNameToAddress.put(memberName , message.member().address());
156
157         for(ShardInformation info : localShards.values()){
158             String shardName = info.getShardName();
159             info.updatePeerAddress(getShardIdentifier(memberName, shardName),
160                 getShardActorPath(shardName, memberName));
161         }
162     }
163
164     /**
165      * Notifies all the local shards of a change in the schema context
166      *
167      * @param message
168      */
169     private void updateSchemaContext(Object message) {
170         for(ShardInformation info : localShards.values()){
171             info.getActor().tell(message,getSelf());
172         }
173     }
174
175     private void findPrimary(FindPrimary message) {
176         String shardName = message.getShardName();
177
178         // First see if the there is a local replica for the shard
179         ShardInformation info = localShards.get(shardName);
180         if(info != null) {
181             ActorPath shardPath = info.getActorPath();
182             if (shardPath != null) {
183                 getSender()
184                     .tell(
185                         new PrimaryFound(shardPath.toString()).toSerializable(),
186                         getSelf());
187                 return;
188             }
189         }
190
191         List<String> members =
192             configuration.getMembersFromShardName(shardName);
193
194         if(cluster.getCurrentMemberName() != null) {
195             members.remove(cluster.getCurrentMemberName());
196         }
197
198         // There is no way for us to figure out the primary (for now) so assume
199         // that one of the remote nodes is a primary
200         for(String memberName : members) {
201             Address address = memberNameToAddress.get(memberName);
202             if(address != null){
203                 String path =
204                     getShardActorPath(shardName, memberName);
205                 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
206                 return;
207             }
208         }
209         getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
210     }
211
212     private String getShardActorPath(String shardName, String memberName) {
213         Address address = memberNameToAddress.get(memberName);
214         if(address != null) {
215             StringBuilder builder = new StringBuilder();
216             builder.append(address.toString())
217                 .append("/user/")
218                 .append(ShardManagerIdentifier.builder().type(type).build().toString())
219                 .append("/")
220                 .append(getShardIdentifier(memberName, shardName));
221             return builder.toString();
222         }
223         return null;
224     }
225
226     /**
227      * Construct the name of the shard actor given the name of the member on
228      * which the shard resides and the name of the shard
229      *
230      * @param memberName
231      * @param shardName
232      * @return
233      */
234     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
235         return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
236     }
237
238     /**
239      * Create shards that are local to the member on which the ShardManager
240      * runs
241      *
242      */
243     private void createLocalShards() {
244         String memberName = this.cluster.getCurrentMemberName();
245         List<String> memberShardNames =
246             this.configuration.getMemberShardNames(memberName);
247
248         List<String> localShardActorNames = new ArrayList<>();
249         for(String shardName : memberShardNames){
250             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
251             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
252             ActorRef actor = getContext()
253                 .actorOf(Shard.props(shardId, peerAddresses, dataStoreProperties),
254                     shardId.toString());
255             localShardActorNames.add(shardId.toString());
256             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
257         }
258
259         mBean = ShardManagerInfo
260             .createShardManagerMBean("shard-manager-" + this.type, localShardActorNames);
261
262     }
263
264     /**
265      * Given the name of the shard find the addresses of all it's peers
266      *
267      * @param shardName
268      * @return
269      */
270     private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
271
272         Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
273
274         List<String> members =
275             this.configuration.getMembersFromShardName(shardName);
276
277         String currentMemberName = this.cluster.getCurrentMemberName();
278
279         for(String memberName : members){
280             if(!currentMemberName.equals(memberName)){
281                 ShardIdentifier shardId = getShardIdentifier(memberName,
282                     shardName);
283                 String path =
284                     getShardActorPath(shardName, currentMemberName);
285                 peerAddresses.put(shardId, path);
286             }
287         }
288         return peerAddresses;
289     }
290
291     @Override
292     public SupervisorStrategy supervisorStrategy() {
293
294         return new OneForOneStrategy(10, Duration.create("1 minute"),
295             new Function<Throwable, SupervisorStrategy.Directive>() {
296                 @Override
297                 public SupervisorStrategy.Directive apply(Throwable t) {
298                     LOG.warning("Supervisor Strategy of resume applied {}",t);
299                     return SupervisorStrategy.resume();
300                 }
301             }
302         );
303
304     }
305
306     private class ShardInformation {
307         private final String shardName;
308         private final ActorRef actor;
309         private final ActorPath actorPath;
310         private final Map<ShardIdentifier, String> peerAddresses;
311
312         private ShardInformation(String shardName, ActorRef actor,
313             Map<ShardIdentifier, String> peerAddresses) {
314             this.shardName = shardName;
315             this.actor = actor;
316             this.actorPath = actor.path();
317             this.peerAddresses = peerAddresses;
318         }
319
320         public String getShardName() {
321             return shardName;
322         }
323
324         public ActorRef getActor(){
325             return actor;
326         }
327
328         public ActorPath getActorPath() {
329             return actorPath;
330         }
331
332         public void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
333             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
334                 peerAddress);
335             if(peerAddresses.containsKey(peerId)){
336                 peerAddresses.put(peerId, peerAddress);
337
338                 LOG.debug(
339                     "Sending PeerAddressResolved for peer {} with address {} to {}",
340                     peerId, peerAddress, actor.path());
341
342                 actor
343                     .tell(new PeerAddressResolved(peerId, peerAddress),
344                         getSelf());
345
346             }
347         }
348     }
349 }
350
351
352