2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.cluster.Cluster;
20 import akka.cluster.ClusterEvent;
21 import akka.cluster.ClusterEvent.MemberExited;
22 import akka.cluster.ClusterEvent.MemberRemoved;
23 import akka.cluster.ClusterEvent.MemberUp;
24 import akka.cluster.ClusterEvent.MemberWeaklyUp;
25 import akka.cluster.ClusterEvent.ReachableMember;
26 import akka.cluster.ClusterEvent.UnreachableMember;
27 import akka.cluster.Member;
28 import akka.cluster.ddata.DistributedData;
29 import akka.cluster.ddata.ORMap;
30 import akka.cluster.ddata.Replicator;
31 import akka.cluster.ddata.Replicator.Changed;
32 import akka.cluster.ddata.Replicator.Subscribe;
33 import akka.cluster.ddata.Replicator.Update;
34 import akka.util.Timeout;
35 import com.google.common.base.Preconditions;
36 import com.google.common.collect.Sets;
37 import com.google.common.collect.Sets.SetView;
38 import java.util.ArrayList;
39 import java.util.Collection;
40 import java.util.HashMap;
41 import java.util.List;
43 import java.util.concurrent.CompletableFuture;
44 import java.util.concurrent.TimeUnit;
45 import java.util.function.Function;
46 import java.util.stream.Collectors;
47 import org.opendaylight.controller.cluster.access.concepts.MemberName;
48 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
49 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
50 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
51 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
52 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
53 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
54 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
55 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
56 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
57 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
58 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
59 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
60 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
61 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
63 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
64 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
65 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
66 import org.opendaylight.yangtools.concepts.ListenerRegistration;
67 import scala.compat.java8.FutureConverters;
70 * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
71 * nodes of newly open producers/shards on the local node.
73 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
75 private static final String PERSISTENCE_ID = "sharding-service-actor";
76 private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
78 private final DistributedShardedDOMDataTree shardingService;
79 private final ActorSystem actorSystem;
80 private final ClusterWrapper clusterWrapper;
81 // helper actorContext used only for static calls to executeAsync etc
82 // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
83 private final ActorContext actorContext;
84 private final ShardingServiceAddressResolver resolver;
85 private final DistributedDataStore distributedConfigDatastore;
86 private final DistributedDataStore distributedOperDatastore;
88 private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
89 private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
91 private final Cluster cluster;
92 private final ActorRef replicator;
94 private ORMap<PrefixShardConfiguration> currentData = ORMap.create();
95 private Map<DOMDataTreeIdentifier, PrefixShardConfiguration> currentConfiguration = new HashMap<>();
97 ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
98 LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
100 shardingService = builder.getShardingService();
101 actorSystem = builder.getActorSystem();
102 clusterWrapper = builder.getClusterWrapper();
103 distributedConfigDatastore = builder.getDistributedConfigDatastore();
104 distributedOperDatastore = builder.getDistributedOperDatastore();
105 actorContext = distributedConfigDatastore.getActorContext();
106 resolver = new ShardingServiceAddressResolver(
107 DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
109 clusterWrapper.subscribeToMemberEvents(self());
110 cluster = Cluster.get(actorSystem);
112 replicator = DistributedData.get(context().system()).replicator();
116 public void preStart() {
117 final Subscribe<ORMap<PrefixShardConfiguration>> subscribe =
118 new Subscribe<>(ClusterUtils.CONFIGURATION_KEY, self());
119 replicator.tell(subscribe, noSender());
123 protected void handleRecover(final Object message) throws Exception {
124 LOG.debug("Received a recover message {}", message);
128 protected void handleCommand(final Object message) throws Exception {
129 LOG.debug("Received {}", message);
130 if (message instanceof ClusterEvent.MemberUp) {
131 memberUp((ClusterEvent.MemberUp) message);
132 } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
133 memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
134 } else if (message instanceof ClusterEvent.MemberExited) {
135 memberExited((ClusterEvent.MemberExited) message);
136 } else if (message instanceof ClusterEvent.MemberRemoved) {
137 memberRemoved((ClusterEvent.MemberRemoved) message);
138 } else if (message instanceof ClusterEvent.UnreachableMember) {
139 memberUnreachable((ClusterEvent.UnreachableMember) message);
140 } else if (message instanceof ClusterEvent.ReachableMember) {
141 memberReachable((ClusterEvent.ReachableMember) message);
142 } else if (message instanceof Changed) {
143 onConfigChanged((Changed) message);
144 } else if (message instanceof ProducerCreated) {
145 onProducerCreated((ProducerCreated) message);
146 } else if (message instanceof NotifyProducerCreated) {
147 onNotifyProducerCreated((NotifyProducerCreated) message);
148 } else if (message instanceof ProducerRemoved) {
149 onProducerRemoved((ProducerRemoved) message);
150 } else if (message instanceof NotifyProducerRemoved) {
151 onNotifyProducerRemoved((NotifyProducerRemoved) message);
152 } else if (message instanceof PrefixShardCreated) {
153 onPrefixShardCreated((PrefixShardCreated) message);
154 } else if (message instanceof CreatePrefixShard) {
155 onCreatePrefixShard((CreatePrefixShard) message);
156 } else if (message instanceof RemovePrefixShard) {
157 onRemovePrefixShard((RemovePrefixShard) message);
158 } else if (message instanceof PrefixShardRemoved) {
159 onPrefixShardRemoved((PrefixShardRemoved) message);
163 private void onConfigChanged(final Changed<ORMap<PrefixShardConfiguration>> change) {
164 LOG.debug("member : {}, Received configuration changed: {}", clusterWrapper.getCurrentMemberName(), change);
166 currentData = change.dataValue();
167 final Map<String, PrefixShardConfiguration> changedConfig = change.dataValue().getEntries();
169 LOG.debug("Changed set {}", changedConfig);
172 final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig =
173 changedConfig.values().stream().collect(
174 Collectors.toMap(PrefixShardConfiguration::getPrefix, Function.identity()));
175 resolveConfig(newConfig);
176 } catch (final IllegalStateException e) {
177 LOG.error("Failed, ", e);
182 private void resolveConfig(final Map<DOMDataTreeIdentifier, PrefixShardConfiguration> newConfig) {
184 // get the removed configurations
185 final SetView<DOMDataTreeIdentifier> deleted =
186 Sets.difference(currentConfiguration.keySet(), newConfig.keySet());
187 shardingService.resolveShardRemovals(deleted);
189 // get the added configurations
190 final SetView<DOMDataTreeIdentifier> additions =
191 Sets.difference(newConfig.keySet(), currentConfiguration.keySet());
192 shardingService.resolveShardAdditions(additions);
193 // we can ignore those that existed previously since the potential changes in replicas will be handled by
196 currentConfiguration = new HashMap<>(newConfig);
200 public String persistenceId() {
201 return PERSISTENCE_ID;
204 private void memberUp(final MemberUp message) {
205 final MemberName memberName = memberToName(message.member());
207 LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
208 message.member().address());
210 resolver.addPeerAddress(memberName, message.member().address());
213 private void memberWeaklyUp(final MemberWeaklyUp message) {
214 final MemberName memberName = memberToName(message.member());
216 LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
217 message.member().address());
219 resolver.addPeerAddress(memberName, message.member().address());
222 private void memberExited(final MemberExited message) {
223 final MemberName memberName = memberToName(message.member());
225 LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
226 message.member().address());
228 resolver.removePeerAddress(memberName);
231 private void memberRemoved(final MemberRemoved message) {
232 final MemberName memberName = memberToName(message.member());
234 LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
235 message.member().address());
237 resolver.removePeerAddress(memberName);
240 private void memberUnreachable(final UnreachableMember message) {
241 final MemberName memberName = memberToName(message.member());
242 LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
244 resolver.removePeerAddress(memberName);
247 private void memberReachable(final ReachableMember message) {
248 final MemberName memberName = memberToName(message.member());
249 LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
251 resolver.addPeerAddress(memberName, message.member().address());
254 private void onProducerCreated(final ProducerCreated message) {
255 LOG.debug("Received ProducerCreated: {}", message);
257 // fastpath if no replication is needed, since there is only one node
258 if (resolver.getShardingServicePeerActorAddresses().size() == 1) {
259 getSender().tell(new Status.Success(null), noSender());
262 final ActorRef sender = getSender();
263 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
265 final List<CompletableFuture<Object>> futures = new ArrayList<>();
267 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
268 final ActorSelection actorSelection = actorSystem.actorSelection(address);
270 FutureConverters.toJava(
271 actorContext.executeOperationAsync(
272 actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
273 .toCompletableFuture());
276 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
277 futures.toArray(new CompletableFuture[futures.size()]));
279 combinedFuture.thenRun(() -> {
280 sender.tell(new Status.Success(null), noSender());
281 }).exceptionally(throwable -> {
282 sender.tell(new Status.Failure(throwable), self());
287 private void onNotifyProducerCreated(final NotifyProducerCreated message) {
288 LOG.debug("Received NotifyProducerCreated: {}", message);
290 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
293 final ActorProducerRegistration registration =
294 new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
295 subtrees.forEach(id -> idToProducer.put(id, registration));
296 sender().tell(new Status.Success(null), self());
297 } catch (final IllegalArgumentException e) {
298 sender().tell(new Status.Failure(e), getSelf());
302 private void onProducerRemoved(final ProducerRemoved message) {
303 LOG.debug("Received ProducerRemoved: {}", message);
305 final List<CompletableFuture<Object>> futures = new ArrayList<>();
307 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
308 final ActorSelection selection = actorSystem.actorSelection(address);
310 futures.add(FutureConverters.toJava(
311 actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
312 .toCompletableFuture());
315 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
316 futures.toArray(new CompletableFuture[futures.size()]));
318 final ActorRef respondTo = getSender();
321 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
322 .exceptionally(e -> {
323 respondTo.tell(new Status.Failure(null), self());
329 private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
330 LOG.debug("Received NotifyProducerRemoved: {}", message);
332 final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
333 if (registration == null) {
334 LOG.warn("The notification contained a path on which no producer is registered, throwing away");
335 getSender().tell(new Status.Success(null), noSender());
340 registration.close();
341 getSender().tell(new Status.Success(null), noSender());
342 } catch (final DOMDataTreeProducerException e) {
343 LOG.error("Unable to close producer", e);
344 getSender().tell(new Status.Failure(e), noSender());
348 @SuppressWarnings("checkstyle:IllegalCatch")
349 private void onCreatePrefixShard(final CreatePrefixShard message) {
350 LOG.debug("Member: {}, Received CreatePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
352 final PrefixShardConfiguration configuration = message.getConfiguration();
354 final Update<ORMap<PrefixShardConfiguration>> update =
355 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
356 map -> map.put(cluster, configuration.toDataMapKey(), configuration));
358 replicator.tell(update, self());
361 private void onPrefixShardCreated(final PrefixShardCreated message) {
362 LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
364 final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
365 final ActorRef sender = getSender();
367 final List<CompletableFuture<Object>> futures = new ArrayList<>();
369 for (final String address : addresses) {
370 final ActorSelection actorSelection = actorSystem.actorSelection(address);
371 futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection,
372 new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture());
375 final CompletableFuture<Void> combinedFuture =
376 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
378 combinedFuture.thenRun(() -> {
379 sender.tell(new Status.Success(null), self());
380 }).exceptionally(throwable -> {
381 sender.tell(new Status.Failure(throwable), self());
386 private void onRemovePrefixShard(final RemovePrefixShard message) {
387 LOG.debug("Member: {}, Received RemovePrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
389 //TODO the removal message should have the configuration or some other way to get to the key
390 final Update<ORMap<PrefixShardConfiguration>> removal =
391 new Update<>(ClusterUtils.CONFIGURATION_KEY, currentData, Replicator.writeLocal(),
392 map -> map.remove(cluster, "prefix=" + message.getPrefix()));
393 replicator.tell(removal, self());
396 private void onPrefixShardRemoved(final PrefixShardRemoved message) {
397 LOG.debug("Received PrefixShardRemoved: {}", message);
399 final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix());
401 if (registration == null) {
402 LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}",
403 message.getPrefix(), idToShardRegistration);
407 registration.close();
410 private static MemberName memberToName(final Member member) {
411 return MemberName.forName(member.roles().iterator().next());
414 private class ActorProducerRegistration {
416 private final DOMDataTreeProducer producer;
417 private final Collection<DOMDataTreeIdentifier> subtrees;
419 ActorProducerRegistration(final DOMDataTreeProducer producer,
420 final Collection<DOMDataTreeIdentifier> subtrees) {
421 this.producer = producer;
422 this.subtrees = subtrees;
425 void close() throws DOMDataTreeProducerException {
427 subtrees.forEach(idToProducer::remove);
431 private static class ShardFrontendRegistration extends
432 AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
434 private final ActorRef clientActor;
435 private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
437 ShardFrontendRegistration(final ActorRef clientActor,
438 final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
439 super(shardRegistration);
440 this.clientActor = clientActor;
441 this.shardRegistration = shardRegistration;
445 protected void removeRegistration() {
446 shardRegistration.close();
447 clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
451 public static class ShardedDataTreeActorCreator {
453 private DistributedShardedDOMDataTree shardingService;
454 private DistributedDataStore distributedConfigDatastore;
455 private DistributedDataStore distributedOperDatastore;
456 private ActorSystem actorSystem;
457 private ClusterWrapper cluster;
459 public DistributedShardedDOMDataTree getShardingService() {
460 return shardingService;
463 public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
464 this.shardingService = shardingService;
468 public ActorSystem getActorSystem() {
472 public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
473 this.actorSystem = actorSystem;
477 public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
478 this.cluster = cluster;
482 public ClusterWrapper getClusterWrapper() {
486 public DistributedDataStore getDistributedConfigDatastore() {
487 return distributedConfigDatastore;
490 public ShardedDataTreeActorCreator setDistributedConfigDatastore(
491 final DistributedDataStore distributedConfigDatastore) {
492 this.distributedConfigDatastore = distributedConfigDatastore;
496 public DistributedDataStore getDistributedOperDatastore() {
497 return distributedOperDatastore;
500 public ShardedDataTreeActorCreator setDistributedOperDatastore(
501 final DistributedDataStore distributedOperDatastore) {
502 this.distributedOperDatastore = distributedOperDatastore;
506 private void verify() {
507 Preconditions.checkNotNull(shardingService);
508 Preconditions.checkNotNull(actorSystem);
509 Preconditions.checkNotNull(cluster);
510 Preconditions.checkNotNull(distributedConfigDatastore);
511 Preconditions.checkNotNull(distributedOperDatastore);
514 public Props props() {
516 return Props.create(ShardedDataTreeActor.class, this);