9c8f0b24440775a68f6047e3be19210583e66dbb
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import akka.japi.Procedure;
21 import akka.persistence.RecoveryCompleted;
22 import akka.persistence.RecoveryFailure;
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Supplier;
26 import com.google.common.collect.ImmutableSet;
27 import com.google.common.collect.Lists;
28 import java.io.Serializable;
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import org.opendaylight.controller.cluster.DataPersistenceProvider;
38 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
39 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
40 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
41 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
42 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfoMBean;
43 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
44 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
45 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
46 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
48 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
49 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
50 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
51 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
52 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
53 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import scala.concurrent.duration.Duration;
58
59 /**
60  * The ShardManager has the following jobs,
61  * <ul>
62  * <li> Create all the local shard replicas that belong on this cluster member
63  * <li> Find the address of the local shard
64  * <li> Find the primary replica for any given shard
65  * <li> Monitor the cluster members and store their addresses
66  * <ul>
67  */
68 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
69
70     private final Logger LOG = LoggerFactory.getLogger(getClass());
71
72     // Stores a mapping between a member name and the address of the member
73     // Member names look like "member-1", "member-2" etc and are as specified
74     // in configuration
75     private final Map<String, Address> memberNameToAddress = new HashMap<>();
76
77     // Stores a mapping between a shard name and it's corresponding information
78     // Shard names look like inventory, topology etc and are as specified in
79     // configuration
80     private final Map<String, ShardInformation> localShards = new HashMap<>();
81
82     // The type of a ShardManager reflects the type of the datastore itself
83     // A data store could be of type config/operational
84     private final String type;
85
86     private final ClusterWrapper cluster;
87
88     private final Configuration configuration;
89
90     private ShardManagerInfoMBean mBean;
91
92     private final DatastoreContext datastoreContext;
93
94     private Collection<String> knownModules = Collections.emptySet();
95
96     private final DataPersistenceProvider dataPersistenceProvider;
97
98     /**
99      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
100      *             configuration or operational
101      */
102     protected ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
103             DatastoreContext datastoreContext) {
104
105         this.type = Preconditions.checkNotNull(type, "type should not be null");
106         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
107         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
108         this.datastoreContext = datastoreContext;
109         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
110
111         // Subscribe this actor to cluster member events
112         cluster.subscribeToMemberEvents(getSelf());
113
114         createLocalShards();
115     }
116
117     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
118         return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
119     }
120
121     public static Props props(final String type,
122         final ClusterWrapper cluster,
123         final Configuration configuration,
124         final DatastoreContext datastoreContext) {
125
126         Preconditions.checkNotNull(type, "type should not be null");
127         Preconditions.checkNotNull(cluster, "cluster should not be null");
128         Preconditions.checkNotNull(configuration, "configuration should not be null");
129
130         return Props.create(new ShardManagerCreator(type, cluster, configuration, datastoreContext));
131     }
132
133     @Override
134     public void handleCommand(Object message) throws Exception {
135         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
136             findPrimary(FindPrimary.fromSerializable(message));
137         } else if(message instanceof FindLocalShard){
138             findLocalShard((FindLocalShard) message);
139         } else if (message instanceof UpdateSchemaContext) {
140             updateSchemaContext(message);
141         } else if(message instanceof ActorInitialized) {
142             onActorInitialized(message);
143         } else if (message instanceof ClusterEvent.MemberUp){
144             memberUp((ClusterEvent.MemberUp) message);
145         } else if(message instanceof ClusterEvent.MemberRemoved) {
146             memberRemoved((ClusterEvent.MemberRemoved) message);
147         } else if(message instanceof ClusterEvent.UnreachableMember) {
148             ignoreMessage(message);
149         } else{
150             unknownMessage(message);
151         }
152
153     }
154
155     private void onActorInitialized(Object message) {
156         final ActorRef sender = getSender();
157
158         if (sender == null) {
159             return; //why is a non-actor sending this message? Just ignore.
160         }
161
162         String actorName = sender.path().name();
163         //find shard name from actor name; actor name is stringified shardId
164         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
165
166         if (shardId.getShardName() == null) {
167             return;
168         }
169         markShardAsInitialized(shardId.getShardName());
170     }
171
172     private void markShardAsInitialized(String shardName) {
173         LOG.debug("Initializing shard [{}]", shardName);
174         ShardInformation shardInformation = localShards.get(shardName);
175         if (shardInformation != null) {
176             shardInformation.setActorInitialized();
177         }
178     }
179
180     @Override
181     protected void handleRecover(Object message) throws Exception {
182         if(dataPersistenceProvider.isRecoveryApplicable()) {
183             if (message instanceof SchemaContextModules) {
184                 SchemaContextModules msg = (SchemaContextModules) message;
185                 knownModules = ImmutableSet.copyOf(msg.getModules());
186             } else if (message instanceof RecoveryFailure) {
187                 RecoveryFailure failure = (RecoveryFailure) message;
188                 LOG.error("Recovery failed", failure.cause());
189             } else if (message instanceof RecoveryCompleted) {
190                 LOG.info("Recovery complete : {}", persistenceId());
191
192                 // Delete all the messages from the akka journal except the last one
193                 deleteMessages(lastSequenceNr() - 1);
194             }
195         } else {
196             if (message instanceof RecoveryCompleted) {
197                 LOG.info("Recovery complete : {}", persistenceId());
198
199                 // Delete all the messages from the akka journal
200                 deleteMessages(lastSequenceNr());
201             }
202         }
203     }
204
205     private void findLocalShard(FindLocalShard message) {
206         final ShardInformation shardInformation = localShards.get(message.getShardName());
207
208         if(shardInformation == null){
209             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
210             return;
211         }
212
213         sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
214             @Override
215             public Object get() {
216                 return new LocalShardFound(shardInformation.getActor());
217             }
218         });
219     }
220
221     private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
222             final Supplier<Object> messageSupplier) {
223         if (!shardInformation.isShardInitialized()) {
224             if(waitUntilInitialized) {
225                 final ActorRef sender = getSender();
226                 final ActorRef self = self();
227                 shardInformation.addRunnableOnInitialized(new Runnable() {
228                     @Override
229                     public void run() {
230                         sender.tell(messageSupplier.get(), self);
231                     }
232                 });
233             } else {
234                 getSender().tell(new ActorNotInitialized(), getSelf());
235             }
236
237             return;
238         }
239
240         getSender().tell(messageSupplier.get(), getSelf());
241     }
242
243     private void memberRemoved(ClusterEvent.MemberRemoved message) {
244         memberNameToAddress.remove(message.member().roles().head());
245     }
246
247     private void memberUp(ClusterEvent.MemberUp message) {
248         String memberName = message.member().roles().head();
249
250         memberNameToAddress.put(memberName, message.member().address());
251
252         for(ShardInformation info : localShards.values()){
253             String shardName = info.getShardName();
254             info.updatePeerAddress(getShardIdentifier(memberName, shardName),
255                 getShardActorPath(shardName, memberName));
256         }
257     }
258
259     /**
260      * Notifies all the local shards of a change in the schema context
261      *
262      * @param message
263      */
264     private void updateSchemaContext(final Object message) {
265         final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
266
267         Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
268         Set<String> newModules = new HashSet<>(128);
269
270         for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
271             String s = moduleIdentifier.getNamespace().toString();
272             newModules.add(s);
273         }
274
275         if(newModules.containsAll(knownModules)) {
276
277             LOG.info("New SchemaContext has a super set of current knownModules - persisting info");
278
279             knownModules = ImmutableSet.copyOf(newModules);
280
281             dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
282
283                 @Override
284                 public void apply(SchemaContextModules param) throws Exception {
285                     LOG.info("Sending new SchemaContext to Shards");
286                     for (ShardInformation info : localShards.values()) {
287                         if (info.getActor() == null) {
288                             info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
289                                             info.getPeerAddresses(), datastoreContext, schemaContext),
290                                     info.getShardId().toString()));
291                         } else {
292                             info.getActor().tell(message, getSelf());
293                         }
294                     }
295                 }
296
297             });
298         } else {
299             LOG.info("Rejecting schema context update because it is not a super set of previously known modules");
300         }
301
302     }
303
304     private void findPrimary(FindPrimary message) {
305         String shardName = message.getShardName();
306
307         // First see if the there is a local replica for the shard
308         final ShardInformation info = localShards.get(shardName);
309         if (info != null) {
310             sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
311                 @Override
312                 public Object get() {
313                     return new PrimaryFound(info.getActorPath().toString()).toSerializable();
314                 }
315             });
316
317             return;
318         }
319
320         List<String> members = configuration.getMembersFromShardName(shardName);
321
322         if(cluster.getCurrentMemberName() != null) {
323             members.remove(cluster.getCurrentMemberName());
324         }
325
326         /**
327          * FIXME: Instead of sending remote shard actor path back to sender,
328          * forward FindPrimary message to remote shard manager
329          */
330         // There is no way for us to figure out the primary (for now) so assume
331         // that one of the remote nodes is a primary
332         for(String memberName : members) {
333             Address address = memberNameToAddress.get(memberName);
334             if(address != null){
335                 String path =
336                     getShardActorPath(shardName, memberName);
337                 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
338                 return;
339             }
340         }
341         getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
342     }
343
344     private String getShardActorPath(String shardName, String memberName) {
345         Address address = memberNameToAddress.get(memberName);
346         if(address != null) {
347             StringBuilder builder = new StringBuilder();
348             builder.append(address.toString())
349                 .append("/user/")
350                 .append(ShardManagerIdentifier.builder().type(type).build().toString())
351                 .append("/")
352                 .append(getShardIdentifier(memberName, shardName));
353             return builder.toString();
354         }
355         return null;
356     }
357
358     /**
359      * Construct the name of the shard actor given the name of the member on
360      * which the shard resides and the name of the shard
361      *
362      * @param memberName
363      * @param shardName
364      * @return
365      */
366     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
367         return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
368     }
369
370     /**
371      * Create shards that are local to the member on which the ShardManager
372      * runs
373      *
374      */
375     private void createLocalShards() {
376         String memberName = this.cluster.getCurrentMemberName();
377         List<String> memberShardNames =
378             this.configuration.getMemberShardNames(memberName);
379
380         List<String> localShardActorNames = new ArrayList<>();
381         for(String shardName : memberShardNames){
382             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
383             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
384             localShardActorNames.add(shardId.toString());
385             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
386         }
387
388         mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
389                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
390     }
391
392     /**
393      * Given the name of the shard find the addresses of all it's peers
394      *
395      * @param shardName
396      * @return
397      */
398     private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
399
400         Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
401
402         List<String> members =
403             this.configuration.getMembersFromShardName(shardName);
404
405         String currentMemberName = this.cluster.getCurrentMemberName();
406
407         for(String memberName : members){
408             if(!currentMemberName.equals(memberName)){
409                 ShardIdentifier shardId = getShardIdentifier(memberName,
410                     shardName);
411                 String path =
412                     getShardActorPath(shardName, currentMemberName);
413                 peerAddresses.put(shardId, path);
414             }
415         }
416         return peerAddresses;
417     }
418
419     @Override
420     public SupervisorStrategy supervisorStrategy() {
421
422         return new OneForOneStrategy(10, Duration.create("1 minute"),
423             new Function<Throwable, SupervisorStrategy.Directive>() {
424                 @Override
425                 public SupervisorStrategy.Directive apply(Throwable t) {
426                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
427                     return SupervisorStrategy.resume();
428                 }
429             }
430         );
431
432     }
433
434     @Override
435     public String persistenceId() {
436         return "shard-manager-" + type;
437     }
438
439     @VisibleForTesting
440     Collection<String> getKnownModules() {
441         return knownModules;
442     }
443
444     @VisibleForTesting
445     DataPersistenceProvider getDataPersistenceProvider() {
446         return dataPersistenceProvider;
447     }
448
449     private class ShardInformation {
450         private final ShardIdentifier shardId;
451         private final String shardName;
452         private ActorRef actor;
453         private ActorPath actorPath;
454         private final Map<ShardIdentifier, String> peerAddresses;
455
456         // flag that determines if the actor is ready for business
457         private boolean actorInitialized = false;
458
459         private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
460
461         private ShardInformation(String shardName, ShardIdentifier shardId,
462                 Map<ShardIdentifier, String> peerAddresses) {
463             this.shardName = shardName;
464             this.shardId = shardId;
465             this.peerAddresses = peerAddresses;
466         }
467
468         String getShardName() {
469             return shardName;
470         }
471
472         ActorRef getActor(){
473             return actor;
474         }
475
476         ActorPath getActorPath() {
477             return actorPath;
478         }
479
480         void setActor(ActorRef actor) {
481             this.actor = actor;
482             this.actorPath = actor.path();
483         }
484
485         ShardIdentifier getShardId() {
486             return shardId;
487         }
488
489         Map<ShardIdentifier, String> getPeerAddresses() {
490             return peerAddresses;
491         }
492
493         void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
494             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
495                 peerAddress);
496             if(peerAddresses.containsKey(peerId)){
497                 peerAddresses.put(peerId, peerAddress);
498
499                 if(actor != null) {
500                     if(LOG.isDebugEnabled()) {
501                         LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
502                                 peerId, peerAddress, actor.path());
503                     }
504
505                     actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
506                 }
507             }
508         }
509
510         boolean isShardInitialized() {
511             return getActor() != null && actorInitialized;
512         }
513
514         void setActorInitialized() {
515             this.actorInitialized = true;
516
517             for(Runnable runnable: runnablesOnInitialized) {
518                 runnable.run();
519             }
520
521             runnablesOnInitialized.clear();
522         }
523
524         void addRunnableOnInitialized(Runnable runnable) {
525             runnablesOnInitialized.add(runnable);
526         }
527     }
528
529     private static class ShardManagerCreator implements Creator<ShardManager> {
530         private static final long serialVersionUID = 1L;
531
532         final String type;
533         final ClusterWrapper cluster;
534         final Configuration configuration;
535         final DatastoreContext datastoreContext;
536
537         ShardManagerCreator(String type, ClusterWrapper cluster,
538                 Configuration configuration, DatastoreContext datastoreContext) {
539             this.type = type;
540             this.cluster = cluster;
541             this.configuration = configuration;
542             this.datastoreContext = datastoreContext;
543         }
544
545         @Override
546         public ShardManager create() throws Exception {
547             return new ShardManager(type, cluster, configuration, datastoreContext);
548         }
549     }
550
551     static class SchemaContextModules implements Serializable {
552         private static final long serialVersionUID = -8884620101025936590L;
553
554         private final Set<String> modules;
555
556         SchemaContextModules(Set<String> modules){
557             this.modules = modules;
558         }
559
560         public Set<String> getModules() {
561             return modules;
562         }
563     }
564 }
565
566
567