Cache MapJoiner
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardedDataTreeActor.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.cluster.Cluster;
20 import akka.cluster.ClusterEvent;
21 import akka.cluster.ClusterEvent.MemberExited;
22 import akka.cluster.ClusterEvent.MemberRemoved;
23 import akka.cluster.ClusterEvent.MemberUp;
24 import akka.cluster.ClusterEvent.MemberWeaklyUp;
25 import akka.cluster.ClusterEvent.ReachableMember;
26 import akka.cluster.ClusterEvent.UnreachableMember;
27 import akka.cluster.Member;
28 import akka.cluster.ddata.DistributedData;
29 import akka.cluster.ddata.ORMap;
30 import akka.cluster.ddata.Replicator;
31 import akka.cluster.ddata.Replicator.Changed;
32 import akka.cluster.ddata.Replicator.Subscribe;
33 import akka.cluster.ddata.Replicator.Update;
34 import akka.util.Timeout;
35 import com.google.common.base.Preconditions;
36 import com.google.common.collect.Sets;
37 import com.google.common.collect.Sets.SetView;
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.HashMap;
41 import java.util.List;
42 import java.util.Map;
43 import java.util.concurrent.CompletableFuture;
44 import java.util.concurrent.TimeUnit;
45 import java.util.function.Function;
46 import java.util.stream.Collectors;
47 import org.opendaylight.controller.cluster.access.concepts.MemberName;
48 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
49 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
50 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
51 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
52 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
53 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
54 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
55 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
56 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
57 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
58 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
59 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
60 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
61 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
65 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
66 import org.opendaylight.yangtools.concepts.ListenerRegistration;
67 import scala.compat.java8.FutureConverters;
68
69 /**
70  * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
71  * nodes of newly open producers/shards on the local node.
72  */
73 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
74
75     private static final String PERSISTENCE_ID = "sharding-service-actor";
76     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
77
78     private final DistributedShardedDOMDataTree shardingService;
79     private final ActorSystem actorSystem;
80     private final ClusterWrapper clusterWrapper;
81     // helper actorContext used only for static calls to executeAsync etc
82     // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
83     private final ActorContext actorContext;
84     private final ShardingServiceAddressResolver resolver;
85     private final DistributedDataStore distributedConfigDatastore;
86     private final DistributedDataStore distributedOperDatastore;
87
88     private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
89     private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
90
91     private final Cluster cluster;
92     private final ActorRef replicator;
93
94     private ORMap<PrefixShardConfiguration> currentData = ORMap.create();
95     private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
96
97     ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
98         LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
99
100         shardingService = builder.getShardingService();
101         actorSystem = builder.getActorSystem();
102         clusterWrapper = builder.getClusterWrapper();
103         distributedConfigDatastore = builder.getDistributedConfigDatastore();
104         distributedOperDatastore = builder.getDistributedOperDatastore();
105         actorContext = distributedConfigDatastore.getActorContext();
106         resolver = new ShardingServiceAddressResolver(
107                 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
108
109         clusterWrapper.subscribeToMemberEvents(self());
110         cluster = Cluster.get(actorSystem);
111
112         replicator = DistributedData.get(context().system()).replicator();
113     }
114
115     @Override
116     public void preStart() {
117         final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
118                 new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
119         replicator.tell(subscribe, noSender());
120     }
121
122     @Override
123     protected void handleRecover(final Object message) throws Exception {
124         LOG.debug("Received a recover message {}", message);
125     }
126
127     @Override
128     protected void handleCommand(final Object message) throws Exception {
129         LOG.debug("Received {}", message);
130         if (message instanceof ClusterEvent.MemberUp) {
131             memberUp((ClusterEvent.MemberUp) message);
132         } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
133             memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
134         } else if (message instanceof ClusterEvent.MemberExited) {
135             memberExited((ClusterEvent.MemberExited) message);
136         } else if (message instanceof ClusterEvent.MemberRemoved) {
137             memberRemoved((ClusterEvent.MemberRemoved) message);
138         } else if (message instanceof ClusterEvent.UnreachableMember) {
139             memberUnreachable((ClusterEvent.UnreachableMember) message);
140         } else if (message instanceof ClusterEvent.ReachableMember) {
141             memberReachable((ClusterEvent.ReachableMember) message);
142         } else if (message instanceof Changed) {
143             onConfigChanged((Changed) message);
144         } else if (message instanceof ProducerCreated) {
145             onProducerCreated((ProducerCreated) message);
146         } else if (message instanceof NotifyProducerCreated) {
147             onNotifyProducerCreated((NotifyProducerCreated) message);
148         } else if (message instanceof ProducerRemoved) {
149             onProducerRemoved((ProducerRemoved) message);
150         } else if (message instanceof NotifyProducerRemoved) {
151             onNotifyProducerRemoved((NotifyProducerRemoved) message);
152         } else if (message instanceof PrefixShardCreated) {
153             onPrefixShardCreated((PrefixShardCreated) message);
154         } else if (message instanceof CreatePrefixShard) {
155             onCreatePrefixShard((CreatePrefixShard) message);
156         } else if (message instanceof RemovePrefixShard) {
157             onRemovePrefixShard((RemovePrefixShard) message);
158         } else if (message instanceof PrefixShardRemoved) {
159             onPrefixShardRemoved((PrefixShardRemoved) message);
160         }
161     }
162
163     private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
164         LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change);
165
166         currentData = change.dataValue();
167         final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
168
169         LOG.debug("Changed set {}", changedConfig);
170
171         try {
172             final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
173                     changedConfig.values().stream().collect(
174                             Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity()));
175             resolveConfig(newConfig);
176         } catch (final IllegalStateException e) {
177             LOG.error("Failed, ", e);
178         }
179
180     }
181
182     private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
183
184         // get the removed configurations
185         final SetView<DOMDataTreeIdentifier> deleted =
186                 Sets.difference(currentConfiguration.keySet(), newConfig.keySet());
187         shardingService.resolveShardRemovals(deleted);
188
189         // get the added configurations
190         final SetView<DOMDataTreeIdentifier> additions =
191                 Sets.difference(newConfig.keySet(), currentConfiguration.keySet());
192         shardingService.resolveShardAdditions(additions);
193         // we can ignore those that existed previously since the potential changes in replicas will be handled by
194         // shard manager.
195
196         currentConfiguration = new HashMap<>(newConfig);
197     }
198
199     @Override
200     public String persistenceId() {
201         return PERSISTENCE_ID;
202     }
203
204     private void memberUp(final MemberUp message) {
205         final MemberName memberName = memberToName(message.member());
206
207         LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
208                 message.member().address());
209
210         resolver.addPeerAddress(memberName, message.member().address());
211     }
212
213     private void memberWeaklyUp(final MemberWeaklyUp message) {
214         final MemberName memberName = memberToName(message.member());
215
216         LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
217                 message.member().address());
218
219         resolver.addPeerAddress(memberName, message.member().address());
220     }
221
222     private void memberExited(final MemberExited message) {
223         final MemberName memberName = memberToName(message.member());
224
225         LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
226                 message.member().address());
227
228         resolver.removePeerAddress(memberName);
229     }
230
231     private void memberRemoved(final MemberRemoved message) {
232         final MemberName memberName = memberToName(message.member());
233
234         LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
235                 message.member().address());
236
237         resolver.removePeerAddress(memberName);
238     }
239
240     private void memberUnreachable(final UnreachableMember message) {
241         final MemberName memberName = memberToName(message.member());
242         LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
243
244         resolver.removePeerAddress(memberName);
245     }
246
247     private void memberReachable(final ReachableMember message) {
248         final MemberName memberName = memberToName(message.member());
249         LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
250
251         resolver.addPeerAddress(memberName, message.member().address());
252     }
253
254     private void onProducerCreated(final ProducerCreated message) {
255         LOG.debug("Received ProducerCreated: {}", message);
256
257         // fastpath if no replication is needed, since there is only one node
258         if (resolver.getShardingServicePeerActorAddresses().size() == 1) {
259             getSender().tell(new Status.Success(null), noSender());
260         }
261
262         final ActorRef sender = getSender();
263         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
264
265         final List<CompletableFuture<Object>> futures = new ArrayList<>();
266
267         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
268             final ActorSelection actorSelection = actorSystem.actorSelection(address);
269             futures.add(
270                     FutureConverters.toJava(
271                             actorContext.executeOperationAsync(
272                                     actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
273                     .toCompletableFuture());
274         }
275
276         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
277                 futures.toArray(new CompletableFuture[futures.size()]));
278
279         combinedFuture.thenRun(() -> {
280             sender.tell(new Status.Success(null), noSender());
281         }).exceptionally(throwable -> {
282             sender.tell(new Status.Failure(throwable), self());
283             return null;
284         });
285     }
286
287     private void onNotifyProducerCreated(final NotifyProducerCreated message) {
288         LOG.debug("Received NotifyProducerCreated: {}", message);
289
290         final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
291
292         try {
293             final ActorProducerRegistration registration =
294                     new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
295             subtrees.forEach(id -> idToProducer.put(id, registration));
296             sender().tell(new Status.Success(null), self());
297         } catch (final IllegalArgumentException e) {
298             sender().tell(new Status.Failure(e), getSelf());
299         }
300     }
301
302     private void onProducerRemoved(final ProducerRemoved message) {
303         LOG.debug("Received ProducerRemoved: {}", message);
304
305         final List<CompletableFuture<Object>> futures = new ArrayList<>();
306
307         for (final String address : resolver.getShardingServicePeerActorAddresses()) {
308             final ActorSelection selection = actorSystem.actorSelection(address);
309
310             futures.add(FutureConverters.toJava(
311                     actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
312                     .toCompletableFuture());
313         }
314
315         final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
316                 futures.toArray(new CompletableFuture[futures.size()]));
317
318         final ActorRef respondTo = getSender();
319
320         combinedFuture
321                 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
322                 .exceptionally(e -> {
323                     respondTo.tell(new Status.Failure(null), self());
324                     return null;
325                 });
326
327     }
328
329     private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
330         LOG.debug("Received NotifyProducerRemoved: {}", message);
331
332         final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
333         if (registration == null) {
334             LOG.warn("The notification contained a path on which no producer is registered, throwing away");
335             getSender().tell(new Status.Success(null), noSender());
336             return;
337         }
338
339         try {
340             registration.close();
341             getSender().tell(new Status.Success(null), noSender());
342         } catch (final DOMDataTreeProducerException e) {
343             LOG.error("Unable to close producer", e);
344             getSender().tell(new Status.Failure(e), noSender());
345         }
346     }
347
348     @SuppressWarnings("checkstyle:IllegalCatch")
349     private void onCreatePrefixShard(final CreatePrefixShard message) {
350         LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
351
352         final PrefixShardConfiguration configuration = message.getConfiguration();
353
354         final Update<ORMap<PrefixShardConfiguration>> update =
355                 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
356                     map -> map.put(cluster, configuration.toDataMapKey(), configuration));
357
358         replicator.tell(update, self());
359     }
360
361     private void onPrefixShardCreated(final PrefixShardCreated message) {
362         LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
363
364         final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
365         final ActorRef sender = getSender();
366
367         final List<CompletableFuture<Object>> futures = new ArrayList<>();
368
369         for (final String address : addresses) {
370             final ActorSelection actorSelection = actorSystem.actorSelection(address);
371             futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection,
372                     new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture());
373         }
374
375         final CompletableFuture<Void> combinedFuture =
376                 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
377
378         combinedFuture.thenRun(() -> {
379             sender.tell(new Status.Success(null), self());
380         }).exceptionally(throwable -> {
381             sender.tell(new Status.Failure(throwable), self());
382             return null;
383         });
384     }
385
386     private void onRemovePrefixShard(final RemovePrefixShard message) {
387         LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
388
389         //TODO the removal message should have the configuration or some other way to get to the key
390         final Update<ORMap<PrefixShardConfiguration>> removal =
391                 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
392                     map -> map.remove(cluster, "prefix=" + message.getPrefix()));
393         replicator.tell(removal, self());
394     }
395
396     private void onPrefixShardRemoved(final PrefixShardRemoved message) {
397         LOG.debug("Received PrefixShardRemoved: {}", message);
398
399         final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix());
400
401         if (registration == null) {
402             LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}",
403                     message.getPrefix(), idToShardRegistration);
404             return;
405         }
406
407         registration.close();
408     }
409
410     private static MemberName memberToName(final Member member) {
411         return MemberName.forName(member.roles().iterator().next());
412     }
413
414     private class ActorProducerRegistration {
415
416         private final DOMDataTreeProducer producer;
417         private final Collection<DOMDataTreeIdentifier> subtrees;
418
419         ActorProducerRegistration(final DOMDataTreeProducer producer,
420                                   final Collection<DOMDataTreeIdentifier> subtrees) {
421             this.producer = producer;
422             this.subtrees = subtrees;
423         }
424
425         void close() throws DOMDataTreeProducerException {
426             producer.close();
427             subtrees.forEach(idToProducer::remove);
428         }
429     }
430
431     private static class ShardFrontendRegistration extends
432             AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
433
434         private final ActorRef clientActor;
435         private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
436
437         ShardFrontendRegistration(final ActorRef clientActor,
438                                   final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
439             super(shardRegistration);
440             this.clientActor = clientActor;
441             this.shardRegistration = shardRegistration;
442         }
443
444         @Override
445         protected void removeRegistration() {
446             shardRegistration.close();
447             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
448         }
449     }
450
451     public static class ShardedDataTreeActorCreator {
452
453         private DistributedShardedDOMDataTree shardingService;
454         private DistributedDataStore distributedConfigDatastore;
455         private DistributedDataStore distributedOperDatastore;
456         private ActorSystem actorSystem;
457         private ClusterWrapper cluster;
458
459         public DistributedShardedDOMDataTree getShardingService() {
460             return shardingService;
461         }
462
463         public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
464             this.shardingService = shardingService;
465             return this;
466         }
467
468         public ActorSystem getActorSystem() {
469             return actorSystem;
470         }
471
472         public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
473             this.actorSystem = actorSystem;
474             return this;
475         }
476
477         public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
478             this.cluster = cluster;
479             return this;
480         }
481
482         public ClusterWrapper getClusterWrapper() {
483             return cluster;
484         }
485
486         public DistributedDataStore getDistributedConfigDatastore() {
487             return distributedConfigDatastore;
488         }
489
490         public ShardedDataTreeActorCreator setDistributedConfigDatastore(
491                 final DistributedDataStore distributedConfigDatastore) {
492             this.distributedConfigDatastore = distributedConfigDatastore;
493             return this;
494         }
495
496         public DistributedDataStore getDistributedOperDatastore() {
497             return distributedOperDatastore;
498         }
499
500         public ShardedDataTreeActorCreator setDistributedOperDatastore(
501                 final DistributedDataStore distributedOperDatastore) {
502             this.distributedOperDatastore = distributedOperDatastore;
503             return this;
504         }
505
506         private void verify() {
507             Preconditions.checkNotNull(shardingService);
508             Preconditions.checkNotNull(actorSystem);
509             Preconditions.checkNotNull(cluster);
510             Preconditions.checkNotNull(distributedConfigDatastore);
511             Preconditions.checkNotNull(distributedOperDatastore);
512         }
513
514         public Props props() {
515             verify();
516             return Props.create(ShardedDataTreeActor.class, this);
517         }
518
519     }
520 }