Merge "Dynamically update DatastoreContext properties"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import akka.japi.Procedure;
21 import akka.persistence.RecoveryCompleted;
22 import akka.persistence.RecoveryFailure;
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Supplier;
26 import com.google.common.collect.ImmutableSet;
27 import com.google.common.collect.Lists;
28 import java.io.Serializable;
29 import java.util.ArrayList;
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.HashSet;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.Set;
37 import org.opendaylight.controller.cluster.DataPersistenceProvider;
38 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
39 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
40 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
41 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
42 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
43 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
44 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
45 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
46 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
47 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
48 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
49 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
50 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
51 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
52 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
53 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57 import scala.concurrent.duration.Duration;
58
59 /**
60  * The ShardManager has the following jobs,
61  * <ul>
62  * <li> Create all the local shard replicas that belong on this cluster member
63  * <li> Find the address of the local shard
64  * <li> Find the primary replica for any given shard
65  * <li> Monitor the cluster members and store their addresses
66  * <ul>
67  */
68 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
69
70     private final Logger LOG = LoggerFactory.getLogger(getClass());
71
72     // Stores a mapping between a member name and the address of the member
73     // Member names look like "member-1", "member-2" etc and are as specified
74     // in configuration
75     private final Map<String, Address> memberNameToAddress = new HashMap<>();
76
77     // Stores a mapping between a shard name and it's corresponding information
78     // Shard names look like inventory, topology etc and are as specified in
79     // configuration
80     private final Map<String, ShardInformation> localShards = new HashMap<>();
81
82     // The type of a ShardManager reflects the type of the datastore itself
83     // A data store could be of type config/operational
84     private final String type;
85
86     private final ClusterWrapper cluster;
87
88     private final Configuration configuration;
89
90     private final String shardDispatcherPath;
91
92     private ShardManagerInfo mBean;
93
94     private DatastoreContext datastoreContext;
95
96     private Collection<String> knownModules = Collections.emptySet();
97
98     private final DataPersistenceProvider dataPersistenceProvider;
99
100     /**
101      */
102     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
103             DatastoreContext datastoreContext) {
104
105         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
106         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
107         this.datastoreContext = datastoreContext;
108         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
109         this.type = datastoreContext.getDataStoreType();
110         this.shardDispatcherPath =
111                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
112
113         // Subscribe this actor to cluster member events
114         cluster.subscribeToMemberEvents(getSelf());
115
116         createLocalShards();
117     }
118
119     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
120         return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
121     }
122
123     public static Props props(
124         final ClusterWrapper cluster,
125         final Configuration configuration,
126         final DatastoreContext datastoreContext) {
127
128         Preconditions.checkNotNull(cluster, "cluster should not be null");
129         Preconditions.checkNotNull(configuration, "configuration should not be null");
130
131         return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext));
132     }
133
134     @Override
135     public void postStop() {
136         LOG.info("Stopping ShardManager");
137
138         mBean.unregisterMBean();
139     }
140
141     @Override
142     public void handleCommand(Object message) throws Exception {
143         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
144             findPrimary(FindPrimary.fromSerializable(message));
145         } else if(message instanceof FindLocalShard){
146             findLocalShard((FindLocalShard) message);
147         } else if (message instanceof UpdateSchemaContext) {
148             updateSchemaContext(message);
149         } else if(message instanceof ActorInitialized) {
150             onActorInitialized(message);
151         } else if (message instanceof ClusterEvent.MemberUp){
152             memberUp((ClusterEvent.MemberUp) message);
153         } else if(message instanceof ClusterEvent.MemberRemoved) {
154             memberRemoved((ClusterEvent.MemberRemoved) message);
155         } else if(message instanceof ClusterEvent.UnreachableMember) {
156             ignoreMessage(message);
157         } else if(message instanceof DatastoreContext) {
158             onDatastoreContext((DatastoreContext)message);
159         } else{
160             unknownMessage(message);
161         }
162
163     }
164
165     private void onActorInitialized(Object message) {
166         final ActorRef sender = getSender();
167
168         if (sender == null) {
169             return; //why is a non-actor sending this message? Just ignore.
170         }
171
172         String actorName = sender.path().name();
173         //find shard name from actor name; actor name is stringified shardId
174         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
175
176         if (shardId.getShardName() == null) {
177             return;
178         }
179         markShardAsInitialized(shardId.getShardName());
180     }
181
182     private void markShardAsInitialized(String shardName) {
183         LOG.debug("Initializing shard [{}]", shardName);
184         ShardInformation shardInformation = localShards.get(shardName);
185         if (shardInformation != null) {
186             shardInformation.setActorInitialized();
187         }
188     }
189
190     @Override
191     protected void handleRecover(Object message) throws Exception {
192         if(dataPersistenceProvider.isRecoveryApplicable()) {
193             if (message instanceof SchemaContextModules) {
194                 SchemaContextModules msg = (SchemaContextModules) message;
195                 knownModules = ImmutableSet.copyOf(msg.getModules());
196             } else if (message instanceof RecoveryFailure) {
197                 RecoveryFailure failure = (RecoveryFailure) message;
198                 LOG.error("Recovery failed", failure.cause());
199             } else if (message instanceof RecoveryCompleted) {
200                 LOG.info("Recovery complete : {}", persistenceId());
201
202                 // Delete all the messages from the akka journal except the last one
203                 deleteMessages(lastSequenceNr() - 1);
204             }
205         } else {
206             if (message instanceof RecoveryCompleted) {
207                 LOG.info("Recovery complete : {}", persistenceId());
208
209                 // Delete all the messages from the akka journal
210                 deleteMessages(lastSequenceNr());
211             }
212         }
213     }
214
215     private void findLocalShard(FindLocalShard message) {
216         final ShardInformation shardInformation = localShards.get(message.getShardName());
217
218         if(shardInformation == null){
219             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
220             return;
221         }
222
223         sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
224             @Override
225             public Object get() {
226                 return new LocalShardFound(shardInformation.getActor());
227             }
228         });
229     }
230
231     private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
232             final Supplier<Object> messageSupplier) {
233         if (!shardInformation.isShardInitialized()) {
234             if(waitUntilInitialized) {
235                 final ActorRef sender = getSender();
236                 final ActorRef self = self();
237                 shardInformation.addRunnableOnInitialized(new Runnable() {
238                     @Override
239                     public void run() {
240                         sender.tell(messageSupplier.get(), self);
241                     }
242                 });
243             } else {
244                 getSender().tell(new ActorNotInitialized(), getSelf());
245             }
246
247             return;
248         }
249
250         getSender().tell(messageSupplier.get(), getSelf());
251     }
252
253     private void memberRemoved(ClusterEvent.MemberRemoved message) {
254         memberNameToAddress.remove(message.member().roles().head());
255     }
256
257     private void memberUp(ClusterEvent.MemberUp message) {
258         String memberName = message.member().roles().head();
259
260         memberNameToAddress.put(memberName, message.member().address());
261
262         for(ShardInformation info : localShards.values()){
263             String shardName = info.getShardName();
264             info.updatePeerAddress(getShardIdentifier(memberName, shardName),
265                 getShardActorPath(shardName, memberName));
266         }
267     }
268
269     private void onDatastoreContext(DatastoreContext context) {
270         datastoreContext = context;
271         for (ShardInformation info : localShards.values()) {
272             if (info.getActor() != null) {
273                 info.getActor().tell(datastoreContext, getSelf());
274             }
275         }
276     }
277
278     /**
279      * Notifies all the local shards of a change in the schema context
280      *
281      * @param message
282      */
283     private void updateSchemaContext(final Object message) {
284         final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
285
286         Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
287         Set<String> newModules = new HashSet<>(128);
288
289         for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
290             String s = moduleIdentifier.getNamespace().toString();
291             newModules.add(s);
292         }
293
294         if(newModules.containsAll(knownModules)) {
295
296             LOG.debug("New SchemaContext has a super set of current knownModules - persisting info");
297
298             knownModules = ImmutableSet.copyOf(newModules);
299
300             dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
301
302                 @Override
303                 public void apply(SchemaContextModules param) throws Exception {
304                     LOG.debug("Sending new SchemaContext to Shards");
305                     for (ShardInformation info : localShards.values()) {
306                         if (info.getActor() == null) {
307                             info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
308                                             info.getPeerAddresses(), datastoreContext, schemaContext)
309                                             .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
310                         } else {
311                             info.getActor().tell(message, getSelf());
312                         }
313                     }
314                 }
315
316             });
317         } else {
318             LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}",
319                     newModules, knownModules);
320         }
321
322     }
323
324     private void findPrimary(FindPrimary message) {
325         String shardName = message.getShardName();
326
327         // First see if the there is a local replica for the shard
328         final ShardInformation info = localShards.get(shardName);
329         if (info != null) {
330             sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
331                 @Override
332                 public Object get() {
333                     return new PrimaryFound(info.getActorPath().toString()).toSerializable();
334                 }
335             });
336
337             return;
338         }
339
340         List<String> members = configuration.getMembersFromShardName(shardName);
341
342         if(cluster.getCurrentMemberName() != null) {
343             members.remove(cluster.getCurrentMemberName());
344         }
345
346         /**
347          * FIXME: Instead of sending remote shard actor path back to sender,
348          * forward FindPrimary message to remote shard manager
349          */
350         // There is no way for us to figure out the primary (for now) so assume
351         // that one of the remote nodes is a primary
352         for(String memberName : members) {
353             Address address = memberNameToAddress.get(memberName);
354             if(address != null){
355                 String path =
356                     getShardActorPath(shardName, memberName);
357                 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
358                 return;
359             }
360         }
361         getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
362     }
363
364     private String getShardActorPath(String shardName, String memberName) {
365         Address address = memberNameToAddress.get(memberName);
366         if(address != null) {
367             StringBuilder builder = new StringBuilder();
368             builder.append(address.toString())
369                 .append("/user/")
370                 .append(ShardManagerIdentifier.builder().type(type).build().toString())
371                 .append("/")
372                 .append(getShardIdentifier(memberName, shardName));
373             return builder.toString();
374         }
375         return null;
376     }
377
378     /**
379      * Construct the name of the shard actor given the name of the member on
380      * which the shard resides and the name of the shard
381      *
382      * @param memberName
383      * @param shardName
384      * @return
385      */
386     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
387         return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
388     }
389
390     /**
391      * Create shards that are local to the member on which the ShardManager
392      * runs
393      *
394      */
395     private void createLocalShards() {
396         String memberName = this.cluster.getCurrentMemberName();
397         List<String> memberShardNames =
398             this.configuration.getMemberShardNames(memberName);
399
400         List<String> localShardActorNames = new ArrayList<>();
401         for(String shardName : memberShardNames){
402             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
403             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
404             localShardActorNames.add(shardId.toString());
405             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
406         }
407
408         mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
409                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
410     }
411
412     /**
413      * Given the name of the shard find the addresses of all it's peers
414      *
415      * @param shardName
416      * @return
417      */
418     private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
419
420         Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
421
422         List<String> members =
423             this.configuration.getMembersFromShardName(shardName);
424
425         String currentMemberName = this.cluster.getCurrentMemberName();
426
427         for(String memberName : members){
428             if(!currentMemberName.equals(memberName)){
429                 ShardIdentifier shardId = getShardIdentifier(memberName,
430                     shardName);
431                 String path =
432                     getShardActorPath(shardName, currentMemberName);
433                 peerAddresses.put(shardId, path);
434             }
435         }
436         return peerAddresses;
437     }
438
439     @Override
440     public SupervisorStrategy supervisorStrategy() {
441
442         return new OneForOneStrategy(10, Duration.create("1 minute"),
443             new Function<Throwable, SupervisorStrategy.Directive>() {
444                 @Override
445                 public SupervisorStrategy.Directive apply(Throwable t) {
446                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
447                     return SupervisorStrategy.resume();
448                 }
449             }
450         );
451
452     }
453
454     @Override
455     public String persistenceId() {
456         return "shard-manager-" + type;
457     }
458
459     @VisibleForTesting
460     Collection<String> getKnownModules() {
461         return knownModules;
462     }
463
464     @VisibleForTesting
465     DataPersistenceProvider getDataPersistenceProvider() {
466         return dataPersistenceProvider;
467     }
468
469     private class ShardInformation {
470         private final ShardIdentifier shardId;
471         private final String shardName;
472         private ActorRef actor;
473         private ActorPath actorPath;
474         private final Map<ShardIdentifier, String> peerAddresses;
475
476         // flag that determines if the actor is ready for business
477         private boolean actorInitialized = false;
478
479         private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
480
481         private ShardInformation(String shardName, ShardIdentifier shardId,
482                 Map<ShardIdentifier, String> peerAddresses) {
483             this.shardName = shardName;
484             this.shardId = shardId;
485             this.peerAddresses = peerAddresses;
486         }
487
488         String getShardName() {
489             return shardName;
490         }
491
492         ActorRef getActor(){
493             return actor;
494         }
495
496         ActorPath getActorPath() {
497             return actorPath;
498         }
499
500         void setActor(ActorRef actor) {
501             this.actor = actor;
502             this.actorPath = actor.path();
503         }
504
505         ShardIdentifier getShardId() {
506             return shardId;
507         }
508
509         Map<ShardIdentifier, String> getPeerAddresses() {
510             return peerAddresses;
511         }
512
513         void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
514             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
515                 peerAddress);
516             if(peerAddresses.containsKey(peerId)){
517                 peerAddresses.put(peerId, peerAddress);
518
519                 if(actor != null) {
520                     if(LOG.isDebugEnabled()) {
521                         LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
522                                 peerId, peerAddress, actor.path());
523                     }
524
525                     actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
526                 }
527             }
528         }
529
530         boolean isShardInitialized() {
531             return getActor() != null && actorInitialized;
532         }
533
534         void setActorInitialized() {
535             this.actorInitialized = true;
536
537             for(Runnable runnable: runnablesOnInitialized) {
538                 runnable.run();
539             }
540
541             runnablesOnInitialized.clear();
542         }
543
544         void addRunnableOnInitialized(Runnable runnable) {
545             runnablesOnInitialized.add(runnable);
546         }
547     }
548
549     private static class ShardManagerCreator implements Creator<ShardManager> {
550         private static final long serialVersionUID = 1L;
551
552         final ClusterWrapper cluster;
553         final Configuration configuration;
554         final DatastoreContext datastoreContext;
555
556         ShardManagerCreator(ClusterWrapper cluster,
557                 Configuration configuration, DatastoreContext datastoreContext) {
558             this.cluster = cluster;
559             this.configuration = configuration;
560             this.datastoreContext = datastoreContext;
561         }
562
563         @Override
564         public ShardManager create() throws Exception {
565             return new ShardManager(cluster, configuration, datastoreContext);
566         }
567     }
568
569     static class SchemaContextModules implements Serializable {
570         private static final long serialVersionUID = -8884620101025936590L;
571
572         private final Set<String> modules;
573
574         SchemaContextModules(Set<String> modules){
575             this.modules = modules;
576         }
577
578         public Set<String> getModules() {
579             return modules;
580         }
581     }
582 }
583
584
585