c1a099b97c49a1d61af6091929b19c0d4deefc84
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardedDataTreeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.cluster.ClusterEvent;
20 import akka.cluster.ClusterEvent.MemberExited;
21 import akka.cluster.ClusterEvent.MemberRemoved;
22 import akka.cluster.ClusterEvent.MemberUp;
23 import akka.cluster.ClusterEvent.MemberWeaklyUp;
24 import akka.cluster.ClusterEvent.ReachableMember;
25 import akka.cluster.ClusterEvent.UnreachableMember;
26 import akka.cluster.Member;
27 import akka.util.Timeout;
28 import com.google.common.base.Preconditions;
29 import com.google.common.base.Throwables;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.CompletableFuture;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import org.opendaylight.controller.cluster.access.concepts.MemberName;
40 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
41 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
42 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
43 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
44 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
45 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
46 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
47 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
48 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
49 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
50 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
51 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
52 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
53 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
54 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
55 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
56 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
63 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import scala.compat.java8.FutureConverters;
66
67 /**
68  * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
69  * nodes of newly open producers/shards on the local node.
70  */
71 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
72
73     private static final String PERSISTENCE_ID = "sharding-service-actor";
74     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
75
76     private final DOMDataTreeService dataTreeService;
77     private final DOMDataTreeShardingService shardingService;
78     private final ActorSystem actorSystem;
79     private final ClusterWrapper cluster;
80     // helper actorContext used only for static calls to executeAsync etc
81     // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
82     private final ActorContext actorContext;
83     private final ShardingServiceAddressResolver resolver;
84     private final DistributedDataStore distributedConfigDatastore;
85     private final DistributedDataStore distributedOperDatastore;
86
87     private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
88     private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
89
90     ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
91         LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
92
93         dataTreeService = builder.getDataTreeService();
94         shardingService = builder.getShardingService();
95         actorSystem = builder.getActorSystem();
96         cluster = builder.getClusterWrapper();
97         distributedConfigDatastore = builder.getDistributedConfigDatastore();
98         distributedOperDatastore = builder.getDistributedOperDatastore();
99         actorContext = distributedConfigDatastore.getActorContext();
100         resolver = new ShardingServiceAddressResolver(
101                 DistributedShardedDOMDataTree.ACTOR_ID, cluster.getCurrentMemberName());
102
103         cluster.subscribeToMemberEvents(self());
104     }
105
106     @Override
107     protected void handleRecover(final Object message) throws Exception {
108         LOG.debug("Received a recover message {}", message);
109     }
110
111     @Override
112     protected void handleCommand(final Object message) throws Exception {
113         if (message instanceof ClusterEvent.MemberUp) {
114             memberUp((ClusterEvent.MemberUp) message);
115         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
116             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
117         } else if (message instanceof ClusterEvent.MemberExited) {
118             memberExited((ClusterEvent.MemberExited) message);
119         } else if (message instanceof ClusterEvent.MemberRemoved) {
120             memberRemoved((ClusterEvent.MemberRemoved) message);
121         } else if (message instanceof ClusterEvent.UnreachableMember) {
122             memberUnreachable((ClusterEvent.UnreachableMember) message);
123         } else if (message instanceof ClusterEvent.ReachableMember) {
124             memberReachable((ClusterEvent.ReachableMember) message);
125         } else if (message instanceof ProducerCreated) {
126             onProducerCreated((ProducerCreated) message);
127         } else if (message instanceof NotifyProducerCreated) {
128             onNotifyProducerCreated((NotifyProducerCreated) message);
129         } else if (message instanceof ProducerRemoved) {
130             onProducerRemoved((ProducerRemoved) message);
131         } else if (message instanceof NotifyProducerRemoved) {
132             onNotifyProducerRemoved((NotifyProducerRemoved) message);
133         } else if (message instanceof PrefixShardCreated) {
134             onPrefixShardCreated((PrefixShardCreated) message);
135         } else if (message instanceof CreatePrefixShard) {
136             onCreatePrefixShard((CreatePrefixShard) message);
137         } else if (message instanceof RemovePrefixShard) {
138             onRemovePrefixShard((RemovePrefixShard) message);
139         } else if (message instanceof PrefixShardRemoved) {
140             onPrefixShardRemoved((PrefixShardRemoved) message);
141         }
142     }
143
144     @Override
145     public String persistenceId() {
146         return PERSISTENCE_ID;
147     }
148
149     private void memberUp(final MemberUp message) {
150         final MemberName memberName = memberToName(message.member());
151
152         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
153                 message.member().address());
154
155         resolver.addPeerAddress(memberName, message.member().address());
156     }
157
158     private void memberWeaklyUp(final MemberWeaklyUp message) {
159         final MemberName memberName = memberToName(message.member());
160
161         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
162                 message.member().address());
163
164         resolver.addPeerAddress(memberName, message.member().address());
165     }
166
167     private void memberExited(final MemberExited message) {
168         final MemberName memberName = memberToName(message.member());
169
170         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
171                 message.member().address());
172
173         resolver.removePeerAddress(memberName);
174     }
175
176     private void memberRemoved(final MemberRemoved message) {
177         final MemberName memberName = memberToName(message.member());
178
179         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
180                 message.member().address());
181
182         resolver.removePeerAddress(memberName);
183     }
184
185     private void memberUnreachable(final UnreachableMember message) {
186         final MemberName memberName = memberToName(message.member());
187         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
188
189         resolver.removePeerAddress(memberName);
190     }
191
192     private void memberReachable(final ReachableMember message) {
193         final MemberName memberName = memberToName(message.member());
194         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
195
196         resolver.addPeerAddress(memberName, message.member().address());
197     }
198
199     private void onProducerCreated(final ProducerCreated message) {
200         LOG.debug("Received ProducerCreated: {}", message);
201         final ActorRef sender = getSender();
202         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
203
204         final List<CompletableFuture<Object>> futures = new ArrayList<>();
205
206         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
207             final ActorSelection actorSelection = actorSystem.actorSelection(address);
208             futures.add(
209                     FutureConverters.toJava(
210                             actorContext.executeOperationAsync(
211                                     actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
212                     .toCompletableFuture());
213         }
214
215         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
216                 futures.toArray(new CompletableFuture[futures.size()]));
217
218         combinedFuture.thenRun(() -> {
219             for (final CompletableFuture<Object> future : futures) {
220                 try {
221                     final Object result = future.get();
222                     if (result instanceof Status.Failure) {
223                         sender.tell(result, self());
224                         return;
225                     }
226                 } catch (InterruptedException | ExecutionException e) {
227                     sender.tell(new Status.Failure(e), self());
228                     return;
229                 }
230             }
231             sender.tell(new Status.Success(null), noSender());
232         }).exceptionally(throwable -> {
233             sender.tell(new Status.Failure(throwable), self());
234             return null;
235         });
236     }
237
238     private void onNotifyProducerCreated(final NotifyProducerCreated message) {
239         LOG.debug("Received NotifyProducerCreated: {}", message);
240
241         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
242
243         try {
244             final ActorProducerRegistration registration =
245                     new ActorProducerRegistration(dataTreeService.createProducer(subtrees), subtrees);
246             subtrees.forEach(id -> idToProducer.put(id, registration));
247             sender().tell(new Status.Success(null), self());
248         } catch (final IllegalArgumentException e) {
249             sender().tell(new Status.Failure(e), getSelf());
250         }
251     }
252
253     private void onProducerRemoved(final ProducerRemoved message) {
254         LOG.debug("Received ProducerRemoved: {}", message);
255
256         final List<CompletableFuture<Object>> futures = new ArrayList<>();
257
258         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
259             final ActorSelection selection = actorSystem.actorSelection(address);
260
261             futures.add(FutureConverters.toJava(
262                     actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
263                     .toCompletableFuture());
264         }
265
266         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
267                 futures.toArray(new CompletableFuture[futures.size()]));
268
269         final ActorRef respondTo = getSender();
270
271         combinedFuture
272                 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
273                 .exceptionally(e -> {
274                     respondTo.tell(new Status.Failure(null), self());
275                     return null;
276                 });
277
278     }
279
280     private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
281         LOG.debug("Received NotifyProducerRemoved: {}", message);
282
283         final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
284         if (registration == null) {
285             LOG.warn("The notification contained a path on which no producer is registered, throwing away");
286             getSender().tell(new Status.Success(null), noSender());
287             return;
288         }
289
290         try {
291             registration.close();
292             getSender().tell(new Status.Success(null), noSender());
293         } catch (final DOMDataTreeProducerException e) {
294             LOG.error("Unable to close producer", e);
295             getSender().tell(new Status.Failure(e), noSender());
296         }
297     }
298
299     @SuppressWarnings("checkstyle:IllegalCatch")
300     private void onCreatePrefixShard(final CreatePrefixShard message) {
301         LOG.debug("Received CreatePrefixShard: {}", message);
302
303         final PrefixShardConfiguration configuration = message.getConfiguration();
304
305         final DOMDataTreeProducer producer =
306                 dataTreeService.createProducer(Collections.singleton(configuration.getPrefix()));
307
308         final DistributedDataStore distributedDataStore =
309                 configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION
310                         ? distributedConfigDatastore : distributedOperDatastore;
311         final String shardName = ClusterUtils.getCleanShardName(configuration.getPrefix().getRootIdentifier());
312         LOG.debug("Creating distributed datastore client for shard {}", shardName);
313         final Props distributedDataStoreClientProps =
314                 SimpleDataStoreClientActor.props(cluster.getCurrentMemberName(),
315                         "Shard-" + shardName, distributedDataStore.getActorContext(), shardName);
316
317         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
318         final DataStoreClient client;
319         try {
320             client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
321         } catch (final Exception e) {
322             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
323             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
324             throw Throwables.propagate(e);
325         }
326
327         try {
328             final ListenerRegistration<ShardFrontend> shardFrontendRegistration =
329                     shardingService.registerDataTreeShard(configuration.getPrefix(),
330                             new ShardFrontend(
331                                     client,
332                                     configuration.getPrefix()
333                             ),
334                             producer);
335             idToShardRegistration.put(configuration.getPrefix(),
336                     new ShardFrontendRegistration(clientActor, shardFrontendRegistration));
337
338             sender().tell(new Status.Success(null), self());
339         } catch (final DOMDataTreeShardingConflictException e) {
340             LOG.error("Unable to create shard", e);
341             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
342             sender().tell(new Status.Failure(e), self());
343         } finally {
344             try {
345                 producer.close();
346             } catch (final DOMDataTreeProducerException e) {
347                 LOG.error("Unable to close producer that was used for shard registration {}", producer, e);
348             }
349         }
350     }
351
352     private void onPrefixShardCreated(final PrefixShardCreated message) {
353         LOG.debug("Received PrefixShardCreated: {}", message);
354
355         final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
356         final ActorRef sender = getSender();
357
358         final List<CompletableFuture<Object>> futures = new ArrayList<>();
359
360         for (final String address : addresses) {
361             final ActorSelection actorSelection = actorSystem.actorSelection(address);
362             futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection,
363                     new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture());
364         }
365
366         final CompletableFuture<Void> combinedFuture =
367                 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
368
369         combinedFuture.thenRun(() -> {
370             for (final CompletableFuture<Object> future : futures) {
371                 try {
372                     final Object result = future.get();
373                     if (result instanceof Status.Failure) {
374                         sender.tell(result, self());
375                         return;
376                     }
377                 } catch (InterruptedException | ExecutionException e) {
378                     sender.tell(new Status.Failure(e), self());
379                     return;
380                 }
381             }
382             sender.tell(new Status.Success(null), self());
383         }).exceptionally(throwable -> {
384             sender.tell(new Status.Failure(throwable), self());
385             return null;
386         });
387     }
388
389     private void onRemovePrefixShard(final RemovePrefixShard message) {
390         LOG.debug("Received RemovePrefixShard: {}", message);
391
392         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
393             final ActorSelection selection = actorContext.actorSelection(address);
394             selection.tell(new PrefixShardRemoved(message.getPrefix()), getSelf());
395         }
396     }
397
398     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
399         LOG.debug("Received PrefixShardRemoved: {}", message);
400
401         final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix());
402
403         if (registration == null) {
404             LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}",
405                     message.getPrefix(), idToShardRegistration);
406             return;
407         }
408
409         registration.close();
410     }
411
412     private static MemberName memberToName(final Member member) {
413         return MemberName.forName(member.roles().iterator().next());
414     }
415
416     private class ActorProducerRegistration {
417
418         private final DOMDataTreeProducer producer;
419         private final Collection<DOMDataTreeIdentifier> subtrees;
420
421         ActorProducerRegistration(final DOMDataTreeProducer producer,
422                                   final Collection<DOMDataTreeIdentifier> subtrees) {
423             this.producer = producer;
424             this.subtrees = subtrees;
425         }
426
427         void close() throws DOMDataTreeProducerException {
428             producer.close();
429             subtrees.forEach(idToProducer::remove);
430         }
431     }
432
433     private static class ShardFrontendRegistration extends
434             AbstractObjectRegistration<ListenerRegistration<ShardFrontend>> {
435
436         private final ActorRef clientActor;
437         private final ListenerRegistration<ShardFrontend> shardRegistration;
438
439         ShardFrontendRegistration(final ActorRef clientActor,
440                                          final ListenerRegistration<ShardFrontend> shardRegistration) {
441             super(shardRegistration);
442             this.clientActor = clientActor;
443             this.shardRegistration = shardRegistration;
444         }
445
446         @Override
447         protected void removeRegistration() {
448             shardRegistration.close();
449             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
450         }
451     }
452
453     public static class ShardedDataTreeActorCreator {
454
455         private DOMDataTreeService dataTreeService;
456         private DOMDataTreeShardingService shardingService;
457         private DistributedDataStore distributedConfigDatastore;
458         private DistributedDataStore distributedOperDatastore;
459         private ActorSystem actorSystem;
460         private ClusterWrapper cluster;
461
462         public DOMDataTreeService getDataTreeService() {
463             return dataTreeService;
464         }
465
466         public ShardedDataTreeActorCreator setDataTreeService(final DOMDataTreeService dataTreeService) {
467             this.dataTreeService = dataTreeService;
468             return this;
469         }
470
471         public DOMDataTreeShardingService getShardingService() {
472             return shardingService;
473         }
474
475         public ShardedDataTreeActorCreator setShardingService(final DOMDataTreeShardingService shardingService) {
476             this.shardingService = shardingService;
477             return this;
478         }
479
480         public ActorSystem getActorSystem() {
481             return actorSystem;
482         }
483
484         public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
485             this.actorSystem = actorSystem;
486             return this;
487         }
488
489         public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
490             this.cluster = cluster;
491             return this;
492         }
493
494         public ClusterWrapper getClusterWrapper() {
495             return cluster;
496         }
497
498         public DistributedDataStore getDistributedConfigDatastore() {
499             return distributedConfigDatastore;
500         }
501
502         public ShardedDataTreeActorCreator setDistributedConfigDatastore(
503                 final DistributedDataStore distributedConfigDatastore) {
504             this.distributedConfigDatastore = distributedConfigDatastore;
505             return this;
506         }
507
508         public DistributedDataStore getDistributedOperDatastore() {
509             return distributedOperDatastore;
510         }
511
512         public ShardedDataTreeActorCreator setDistributedOperDatastore(
513                 final DistributedDataStore distributedOperDatastore) {
514             this.distributedOperDatastore = distributedOperDatastore;
515             return this;
516         }
517
518         private void verify() {
519             Preconditions.checkNotNull(dataTreeService);
520             Preconditions.checkNotNull(shardingService);
521             Preconditions.checkNotNull(actorSystem);
522             Preconditions.checkNotNull(cluster);
523             Preconditions.checkNotNull(distributedConfigDatastore);
524             Preconditions.checkNotNull(distributedOperDatastore);
525         }
526
527         public Props props() {
528             verify();
529             return Props.create(ShardedDataTreeActor.class, this);
530         }
531
532     }
533 }