Merge topic 'cleanup/composite-node'
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardManager.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.datastore;
10
11 import akka.actor.ActorPath;
12 import akka.actor.ActorRef;
13 import akka.actor.Address;
14 import akka.actor.OneForOneStrategy;
15 import akka.actor.Props;
16 import akka.actor.SupervisorStrategy;
17 import akka.cluster.ClusterEvent;
18 import akka.japi.Creator;
19 import akka.japi.Function;
20 import akka.japi.Procedure;
21 import akka.persistence.RecoveryCompleted;
22 import akka.persistence.RecoveryFailure;
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.Preconditions;
25 import com.google.common.base.Strings;
26 import com.google.common.base.Supplier;
27 import com.google.common.collect.ImmutableSet;
28 import com.google.common.collect.Lists;
29 import java.io.Serializable;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.HashSet;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.concurrent.CountDownLatch;
39 import org.opendaylight.controller.cluster.DataPersistenceProvider;
40 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
41 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
42 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
43 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
44 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
45 import org.opendaylight.controller.cluster.datastore.messages.ActorNotInitialized;
46 import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
47 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
48 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
49 import org.opendaylight.controller.cluster.datastore.messages.LocalShardNotFound;
50 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
51 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
52 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
53 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
54 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
55 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
56 import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
57 import org.opendaylight.controller.cluster.raft.RaftState;
58 import org.opendaylight.yangtools.yang.model.api.ModuleIdentifier;
59 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62 import scala.concurrent.duration.Duration;
63
64 /**
65  * The ShardManager has the following jobs,
66  * <ul>
67  * <li> Create all the local shard replicas that belong on this cluster member
68  * <li> Find the address of the local shard
69  * <li> Find the primary replica for any given shard
70  * <li> Monitor the cluster members and store their addresses
71  * <ul>
72  */
73 public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
74
75     private final Logger LOG = LoggerFactory.getLogger(getClass());
76
77     // Stores a mapping between a member name and the address of the member
78     // Member names look like "member-1", "member-2" etc and are as specified
79     // in configuration
80     private final Map<String, Address> memberNameToAddress = new HashMap<>();
81
82     // Stores a mapping between a shard name and it's corresponding information
83     // Shard names look like inventory, topology etc and are as specified in
84     // configuration
85     private final Map<String, ShardInformation> localShards = new HashMap<>();
86
87     // The type of a ShardManager reflects the type of the datastore itself
88     // A data store could be of type config/operational
89     private final String type;
90
91     private final ClusterWrapper cluster;
92
93     private final Configuration configuration;
94
95     private final String shardDispatcherPath;
96
97     private ShardManagerInfo mBean;
98
99     private DatastoreContext datastoreContext;
100
101     private Collection<String> knownModules = Collections.emptySet();
102
103     private final DataPersistenceProvider dataPersistenceProvider;
104
105     private final CountDownLatch waitTillReadyCountdownLatch;
106
107     /**
108      */
109     protected ShardManager(ClusterWrapper cluster, Configuration configuration,
110             DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
111
112         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
113         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
114         this.datastoreContext = datastoreContext;
115         this.dataPersistenceProvider = createDataPersistenceProvider(datastoreContext.isPersistent());
116         this.type = datastoreContext.getDataStoreType();
117         this.shardDispatcherPath =
118                 new Dispatchers(context().system().dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
119         this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
120
121         // Subscribe this actor to cluster member events
122         cluster.subscribeToMemberEvents(getSelf());
123
124         createLocalShards();
125     }
126
127     protected DataPersistenceProvider createDataPersistenceProvider(boolean persistent) {
128         return (persistent) ? new PersistentDataProvider() : new NonPersistentDataProvider();
129     }
130
131     public static Props props(
132         final ClusterWrapper cluster,
133         final Configuration configuration,
134         final DatastoreContext datastoreContext,
135         final CountDownLatch waitTillReadyCountdownLatch) {
136
137         Preconditions.checkNotNull(cluster, "cluster should not be null");
138         Preconditions.checkNotNull(configuration, "configuration should not be null");
139         Preconditions.checkNotNull(waitTillReadyCountdownLatch, "waitTillReadyCountdownLatch should not be null");
140
141         return Props.create(new ShardManagerCreator(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch));
142     }
143
144     @Override
145     public void postStop() {
146         LOG.info("Stopping ShardManager");
147
148         mBean.unregisterMBean();
149     }
150
151     @Override
152     public void handleCommand(Object message) throws Exception {
153         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
154             findPrimary(FindPrimary.fromSerializable(message));
155         } else if(message instanceof FindLocalShard){
156             findLocalShard((FindLocalShard) message);
157         } else if (message instanceof UpdateSchemaContext) {
158             updateSchemaContext(message);
159         } else if(message instanceof ActorInitialized) {
160             onActorInitialized(message);
161         } else if (message instanceof ClusterEvent.MemberUp){
162             memberUp((ClusterEvent.MemberUp) message);
163         } else if(message instanceof ClusterEvent.MemberRemoved) {
164             memberRemoved((ClusterEvent.MemberRemoved) message);
165         } else if(message instanceof ClusterEvent.UnreachableMember) {
166             ignoreMessage(message);
167         } else if(message instanceof DatastoreContext) {
168             onDatastoreContext((DatastoreContext)message);
169         } else if(message instanceof RoleChangeNotification){
170             onRoleChangeNotification((RoleChangeNotification) message);
171         } else{
172             unknownMessage(message);
173         }
174
175     }
176
177     private void onRoleChangeNotification(RoleChangeNotification message) {
178         RoleChangeNotification roleChanged = message;
179         LOG.info("Received role changed for {} from {} to {}", roleChanged.getMemberId(),
180                 roleChanged.getOldRole(), roleChanged.getNewRole());
181
182         ShardInformation shardInformation = findShardInformation(roleChanged.getMemberId());
183         if(shardInformation != null) {
184             shardInformation.setRole(roleChanged.getNewRole());
185
186             if (isReady()) {
187                 LOG.info("All Shards are ready - data store {} is ready, available count is {}", type,
188                         waitTillReadyCountdownLatch.getCount());
189
190                 waitTillReadyCountdownLatch.countDown();
191             }
192         }
193     }
194
195
196     private ShardInformation findShardInformation(String memberId) {
197         for(ShardInformation info : localShards.values()){
198             if(info.getShardId().toString().equals(memberId)){
199                 return info;
200             }
201         }
202
203         return null;
204     }
205
206     private boolean isReady() {
207         boolean isReady = true;
208         for (ShardInformation info : localShards.values()) {
209             if(RaftState.Candidate.name().equals(info.getRole()) || Strings.isNullOrEmpty(info.getRole())){
210                 isReady = false;
211                 break;
212             }
213         }
214         return isReady;
215     }
216
217     private void onActorInitialized(Object message) {
218         final ActorRef sender = getSender();
219
220         if (sender == null) {
221             return; //why is a non-actor sending this message? Just ignore.
222         }
223
224         String actorName = sender.path().name();
225         //find shard name from actor name; actor name is stringified shardId
226         ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(actorName).build();
227
228         if (shardId.getShardName() == null) {
229             return;
230         }
231         markShardAsInitialized(shardId.getShardName());
232     }
233
234     private void markShardAsInitialized(String shardName) {
235         LOG.debug("Initializing shard [{}]", shardName);
236         ShardInformation shardInformation = localShards.get(shardName);
237         if (shardInformation != null) {
238             shardInformation.setActorInitialized();
239         }
240     }
241
242     @Override
243     protected void handleRecover(Object message) throws Exception {
244         if(dataPersistenceProvider.isRecoveryApplicable()) {
245             if (message instanceof SchemaContextModules) {
246                 SchemaContextModules msg = (SchemaContextModules) message;
247                 knownModules = ImmutableSet.copyOf(msg.getModules());
248             } else if (message instanceof RecoveryFailure) {
249                 RecoveryFailure failure = (RecoveryFailure) message;
250                 LOG.error("Recovery failed", failure.cause());
251             } else if (message instanceof RecoveryCompleted) {
252                 LOG.info("Recovery complete : {}", persistenceId());
253
254                 // Delete all the messages from the akka journal except the last one
255                 deleteMessages(lastSequenceNr() - 1);
256             }
257         } else {
258             if (message instanceof RecoveryCompleted) {
259                 LOG.info("Recovery complete : {}", persistenceId());
260
261                 // Delete all the messages from the akka journal
262                 deleteMessages(lastSequenceNr());
263             }
264         }
265     }
266
267     private void findLocalShard(FindLocalShard message) {
268         final ShardInformation shardInformation = localShards.get(message.getShardName());
269
270         if(shardInformation == null){
271             getSender().tell(new LocalShardNotFound(message.getShardName()), getSelf());
272             return;
273         }
274
275         sendResponse(shardInformation, message.isWaitUntilInitialized(), new Supplier<Object>() {
276             @Override
277             public Object get() {
278                 return new LocalShardFound(shardInformation.getActor());
279             }
280         });
281     }
282
283     private void sendResponse(ShardInformation shardInformation, boolean waitUntilInitialized,
284             final Supplier<Object> messageSupplier) {
285         if (!shardInformation.isShardInitialized()) {
286             if(waitUntilInitialized) {
287                 final ActorRef sender = getSender();
288                 final ActorRef self = self();
289                 shardInformation.addRunnableOnInitialized(new Runnable() {
290                     @Override
291                     public void run() {
292                         sender.tell(messageSupplier.get(), self);
293                     }
294                 });
295             } else {
296                 getSender().tell(new ActorNotInitialized(), getSelf());
297             }
298
299             return;
300         }
301
302         getSender().tell(messageSupplier.get(), getSelf());
303     }
304
305     private void memberRemoved(ClusterEvent.MemberRemoved message) {
306         memberNameToAddress.remove(message.member().roles().head());
307     }
308
309     private void memberUp(ClusterEvent.MemberUp message) {
310         String memberName = message.member().roles().head();
311
312         memberNameToAddress.put(memberName, message.member().address());
313
314         for(ShardInformation info : localShards.values()){
315             String shardName = info.getShardName();
316             info.updatePeerAddress(getShardIdentifier(memberName, shardName),
317                 getShardActorPath(shardName, memberName));
318         }
319     }
320
321     private void onDatastoreContext(DatastoreContext context) {
322         datastoreContext = context;
323         for (ShardInformation info : localShards.values()) {
324             if (info.getActor() != null) {
325                 info.getActor().tell(datastoreContext, getSelf());
326             }
327         }
328     }
329
330     /**
331      * Notifies all the local shards of a change in the schema context
332      *
333      * @param message
334      */
335     private void updateSchemaContext(final Object message) {
336         final SchemaContext schemaContext = ((UpdateSchemaContext) message).getSchemaContext();
337
338         Set<ModuleIdentifier> allModuleIdentifiers = schemaContext.getAllModuleIdentifiers();
339         Set<String> newModules = new HashSet<>(128);
340
341         for(ModuleIdentifier moduleIdentifier : allModuleIdentifiers){
342             String s = moduleIdentifier.getNamespace().toString();
343             newModules.add(s);
344         }
345
346         if(newModules.containsAll(knownModules)) {
347
348             LOG.debug("New SchemaContext has a super set of current knownModules - persisting info");
349
350             knownModules = ImmutableSet.copyOf(newModules);
351
352             dataPersistenceProvider.persist(new SchemaContextModules(newModules), new Procedure<SchemaContextModules>() {
353
354                 @Override
355                 public void apply(SchemaContextModules param) throws Exception {
356                     LOG.debug("Sending new SchemaContext to Shards");
357                     for (ShardInformation info : localShards.values()) {
358                         if (info.getActor() == null) {
359                             info.setActor(getContext().actorOf(Shard.props(info.getShardId(),
360                                     info.getPeerAddresses(), datastoreContext, schemaContext)
361                                             .withDispatcher(shardDispatcherPath), info.getShardId().toString()));
362                         } else {
363                             info.getActor().tell(message, getSelf());
364                         }
365                         info.getActor().tell(new RegisterRoleChangeListener(), self());
366                     }
367                 }
368
369             });
370         } else {
371             LOG.debug("Rejecting schema context update - not a super set of previously known modules:\nUPDATE: {}\nKNOWN: {}",
372                     newModules, knownModules);
373         }
374
375     }
376
377     private void findPrimary(FindPrimary message) {
378         String shardName = message.getShardName();
379
380         // First see if the there is a local replica for the shard
381         final ShardInformation info = localShards.get(shardName);
382         if (info != null) {
383             sendResponse(info, message.isWaitUntilInitialized(), new Supplier<Object>() {
384                 @Override
385                 public Object get() {
386                     return new PrimaryFound(info.getActorPath().toString()).toSerializable();
387                 }
388             });
389
390             return;
391         }
392
393         List<String> members = configuration.getMembersFromShardName(shardName);
394
395         if(cluster.getCurrentMemberName() != null) {
396             members.remove(cluster.getCurrentMemberName());
397         }
398
399         /**
400          * FIXME: Instead of sending remote shard actor path back to sender,
401          * forward FindPrimary message to remote shard manager
402          */
403         // There is no way for us to figure out the primary (for now) so assume
404         // that one of the remote nodes is a primary
405         for(String memberName : members) {
406             Address address = memberNameToAddress.get(memberName);
407             if(address != null){
408                 String path =
409                     getShardActorPath(shardName, memberName);
410                 getSender().tell(new PrimaryFound(path).toSerializable(), getSelf());
411                 return;
412             }
413         }
414         getSender().tell(new PrimaryNotFound(shardName).toSerializable(), getSelf());
415     }
416
417     private String getShardActorPath(String shardName, String memberName) {
418         Address address = memberNameToAddress.get(memberName);
419         if(address != null) {
420             StringBuilder builder = new StringBuilder();
421             builder.append(address.toString())
422                 .append("/user/")
423                 .append(ShardManagerIdentifier.builder().type(type).build().toString())
424                 .append("/")
425                 .append(getShardIdentifier(memberName, shardName));
426             return builder.toString();
427         }
428         return null;
429     }
430
431     /**
432      * Construct the name of the shard actor given the name of the member on
433      * which the shard resides and the name of the shard
434      *
435      * @param memberName
436      * @param shardName
437      * @return
438      */
439     private ShardIdentifier getShardIdentifier(String memberName, String shardName){
440         return ShardIdentifier.builder().memberName(memberName).shardName(shardName).type(type).build();
441     }
442
443     /**
444      * Create shards that are local to the member on which the ShardManager
445      * runs
446      *
447      */
448     private void createLocalShards() {
449         String memberName = this.cluster.getCurrentMemberName();
450         List<String> memberShardNames =
451             this.configuration.getMemberShardNames(memberName);
452
453         List<String> localShardActorNames = new ArrayList<>();
454         for(String shardName : memberShardNames){
455             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
456             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
457             localShardActorNames.add(shardId.toString());
458             localShards.put(shardName, new ShardInformation(shardName, shardId, peerAddresses));
459         }
460
461         mBean = ShardManagerInfo.createShardManagerMBean("shard-manager-" + this.type,
462                     datastoreContext.getDataStoreMXBeanType(), localShardActorNames);
463     }
464
465     /**
466      * Given the name of the shard find the addresses of all it's peers
467      *
468      * @param shardName
469      * @return
470      */
471     private Map<ShardIdentifier, String> getPeerAddresses(String shardName){
472
473         Map<ShardIdentifier, String> peerAddresses = new HashMap<>();
474
475         List<String> members =
476             this.configuration.getMembersFromShardName(shardName);
477
478         String currentMemberName = this.cluster.getCurrentMemberName();
479
480         for(String memberName : members){
481             if(!currentMemberName.equals(memberName)){
482                 ShardIdentifier shardId = getShardIdentifier(memberName,
483                     shardName);
484                 String path =
485                     getShardActorPath(shardName, currentMemberName);
486                 peerAddresses.put(shardId, path);
487             }
488         }
489         return peerAddresses;
490     }
491
492     @Override
493     public SupervisorStrategy supervisorStrategy() {
494
495         return new OneForOneStrategy(10, Duration.create("1 minute"),
496             new Function<Throwable, SupervisorStrategy.Directive>() {
497                 @Override
498                 public SupervisorStrategy.Directive apply(Throwable t) {
499                     LOG.warn("Supervisor Strategy caught unexpected exception - resuming", t);
500                     return SupervisorStrategy.resume();
501                 }
502             }
503         );
504
505     }
506
507     @Override
508     public String persistenceId() {
509         return "shard-manager-" + type;
510     }
511
512     @VisibleForTesting
513     Collection<String> getKnownModules() {
514         return knownModules;
515     }
516
517     @VisibleForTesting
518     DataPersistenceProvider getDataPersistenceProvider() {
519         return dataPersistenceProvider;
520     }
521
522     private class ShardInformation {
523         private final ShardIdentifier shardId;
524         private final String shardName;
525         private ActorRef actor;
526         private ActorPath actorPath;
527         private final Map<ShardIdentifier, String> peerAddresses;
528
529         // flag that determines if the actor is ready for business
530         private boolean actorInitialized = false;
531
532         private final List<Runnable> runnablesOnInitialized = Lists.newArrayList();
533         private String role ;
534
535         private ShardInformation(String shardName, ShardIdentifier shardId,
536                 Map<ShardIdentifier, String> peerAddresses) {
537             this.shardName = shardName;
538             this.shardId = shardId;
539             this.peerAddresses = peerAddresses;
540         }
541
542         String getShardName() {
543             return shardName;
544         }
545
546         ActorRef getActor(){
547             return actor;
548         }
549
550         ActorPath getActorPath() {
551             return actorPath;
552         }
553
554         void setActor(ActorRef actor) {
555             this.actor = actor;
556             this.actorPath = actor.path();
557         }
558
559         ShardIdentifier getShardId() {
560             return shardId;
561         }
562
563         Map<ShardIdentifier, String> getPeerAddresses() {
564             return peerAddresses;
565         }
566
567         void updatePeerAddress(ShardIdentifier peerId, String peerAddress){
568             LOG.info("updatePeerAddress for peer {} with address {}", peerId,
569                 peerAddress);
570             if(peerAddresses.containsKey(peerId)){
571                 peerAddresses.put(peerId, peerAddress);
572
573                 if(actor != null) {
574                     if(LOG.isDebugEnabled()) {
575                         LOG.debug("Sending PeerAddressResolved for peer {} with address {} to {}",
576                                 peerId, peerAddress, actor.path());
577                     }
578
579                     actor.tell(new PeerAddressResolved(peerId, peerAddress), getSelf());
580                 }
581             }
582         }
583
584         boolean isShardInitialized() {
585             return getActor() != null && actorInitialized;
586         }
587
588         void setActorInitialized() {
589             this.actorInitialized = true;
590
591             for(Runnable runnable: runnablesOnInitialized) {
592                 runnable.run();
593             }
594
595             runnablesOnInitialized.clear();
596         }
597
598         void addRunnableOnInitialized(Runnable runnable) {
599             runnablesOnInitialized.add(runnable);
600         }
601
602         public void setRole(String newRole) {
603             this.role = newRole;
604         }
605
606         public String getRole(){
607             return this.role;
608         }
609
610     }
611
612     private static class ShardManagerCreator implements Creator<ShardManager> {
613         private static final long serialVersionUID = 1L;
614
615         final ClusterWrapper cluster;
616         final Configuration configuration;
617         final DatastoreContext datastoreContext;
618         private final CountDownLatch waitTillReadyCountdownLatch;
619
620         ShardManagerCreator(ClusterWrapper cluster,
621                             Configuration configuration, DatastoreContext datastoreContext, CountDownLatch waitTillReadyCountdownLatch) {
622             this.cluster = cluster;
623             this.configuration = configuration;
624             this.datastoreContext = datastoreContext;
625             this.waitTillReadyCountdownLatch = waitTillReadyCountdownLatch;
626         }
627
628         @Override
629         public ShardManager create() throws Exception {
630             return new ShardManager(cluster, configuration, datastoreContext, waitTillReadyCountdownLatch);
631         }
632     }
633
634     static class SchemaContextModules implements Serializable {
635         private static final long serialVersionUID = -8884620101025936590L;
636
637         private final Set<String> modules;
638
639         SchemaContextModules(Set<String> modules){
640             this.modules = modules;
641         }
642
643         public Set<String> getModules() {
644             return modules;
645         }
646     }
647 }
648
649
650