2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSelection;
15 import akka.actor.ActorSystem;
16 import akka.actor.PoisonPill;
17 import akka.actor.Props;
18 import akka.actor.Status;
19 import akka.cluster.ClusterEvent;
20 import akka.cluster.ClusterEvent.MemberExited;
21 import akka.cluster.ClusterEvent.MemberRemoved;
22 import akka.cluster.ClusterEvent.MemberUp;
23 import akka.cluster.ClusterEvent.MemberWeaklyUp;
24 import akka.cluster.ClusterEvent.ReachableMember;
25 import akka.cluster.ClusterEvent.UnreachableMember;
26 import akka.cluster.Member;
27 import akka.util.Timeout;
28 import com.google.common.base.Preconditions;
29 import com.google.common.base.Throwables;
30 import java.util.ArrayList;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.HashMap;
34 import java.util.List;
36 import java.util.concurrent.CompletableFuture;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import org.opendaylight.controller.cluster.access.concepts.MemberName;
40 import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
41 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
42 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
43 import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
44 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
45 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
46 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
47 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
48 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
49 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
50 import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
51 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
52 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
53 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
54 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
55 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
56 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
62 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
63 import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
64 import org.opendaylight.yangtools.concepts.ListenerRegistration;
65 import scala.compat.java8.FutureConverters;
68 * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
69 * nodes of newly open producers/shards on the local node.
71 public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
73 private static final String PERSISTENCE_ID = "sharding-service-actor";
74 private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
76 private final DOMDataTreeService dataTreeService;
77 private final DOMDataTreeShardingService shardingService;
78 private final ActorSystem actorSystem;
79 private final ClusterWrapper cluster;
80 // helper actorContext used only for static calls to executeAsync etc
81 // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
82 private final ActorContext actorContext;
83 private final ShardingServiceAddressResolver resolver;
84 private final DistributedDataStore distributedConfigDatastore;
85 private final DistributedDataStore distributedOperDatastore;
87 private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
88 private final Map<DOMDataTreeIdentifier, ShardFrontendRegistration> idToShardRegistration = new HashMap<>();
90 ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
91 LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
93 dataTreeService = builder.getDataTreeService();
94 shardingService = builder.getShardingService();
95 actorSystem = builder.getActorSystem();
96 cluster = builder.getClusterWrapper();
97 distributedConfigDatastore = builder.getDistributedConfigDatastore();
98 distributedOperDatastore = builder.getDistributedOperDatastore();
99 actorContext = distributedConfigDatastore.getActorContext();
100 resolver = new ShardingServiceAddressResolver(
101 DistributedShardedDOMDataTree.ACTOR_ID, cluster.getCurrentMemberName());
103 cluster.subscribeToMemberEvents(self());
107 protected void handleRecover(final Object message) throws Exception {
108 LOG.debug("Received a recover message {}", message);
112 protected void handleCommand(final Object message) throws Exception {
113 if (message instanceof ClusterEvent.MemberUp) {
114 memberUp((ClusterEvent.MemberUp) message);
115 } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
116 memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
117 } else if (message instanceof ClusterEvent.MemberExited) {
118 memberExited((ClusterEvent.MemberExited) message);
119 } else if (message instanceof ClusterEvent.MemberRemoved) {
120 memberRemoved((ClusterEvent.MemberRemoved) message);
121 } else if (message instanceof ClusterEvent.UnreachableMember) {
122 memberUnreachable((ClusterEvent.UnreachableMember) message);
123 } else if (message instanceof ClusterEvent.ReachableMember) {
124 memberReachable((ClusterEvent.ReachableMember) message);
125 } else if (message instanceof ProducerCreated) {
126 onProducerCreated((ProducerCreated) message);
127 } else if (message instanceof NotifyProducerCreated) {
128 onNotifyProducerCreated((NotifyProducerCreated) message);
129 } else if (message instanceof ProducerRemoved) {
130 onProducerRemoved((ProducerRemoved) message);
131 } else if (message instanceof NotifyProducerRemoved) {
132 onNotifyProducerRemoved((NotifyProducerRemoved) message);
133 } else if (message instanceof PrefixShardCreated) {
134 onPrefixShardCreated((PrefixShardCreated) message);
135 } else if (message instanceof CreatePrefixShard) {
136 onCreatePrefixShard((CreatePrefixShard) message);
137 } else if (message instanceof RemovePrefixShard) {
138 onRemovePrefixShard((RemovePrefixShard) message);
139 } else if (message instanceof PrefixShardRemoved) {
140 onPrefixShardRemoved((PrefixShardRemoved) message);
145 public String persistenceId() {
146 return PERSISTENCE_ID;
149 private void memberUp(final MemberUp message) {
150 final MemberName memberName = memberToName(message.member());
152 LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
153 message.member().address());
155 resolver.addPeerAddress(memberName, message.member().address());
158 private void memberWeaklyUp(final MemberWeaklyUp message) {
159 final MemberName memberName = memberToName(message.member());
161 LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
162 message.member().address());
164 resolver.addPeerAddress(memberName, message.member().address());
167 private void memberExited(final MemberExited message) {
168 final MemberName memberName = memberToName(message.member());
170 LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
171 message.member().address());
173 resolver.removePeerAddress(memberName);
176 private void memberRemoved(final MemberRemoved message) {
177 final MemberName memberName = memberToName(message.member());
179 LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
180 message.member().address());
182 resolver.removePeerAddress(memberName);
185 private void memberUnreachable(final UnreachableMember message) {
186 final MemberName memberName = memberToName(message.member());
187 LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
189 resolver.removePeerAddress(memberName);
192 private void memberReachable(final ReachableMember message) {
193 final MemberName memberName = memberToName(message.member());
194 LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
196 resolver.addPeerAddress(memberName, message.member().address());
199 private void onProducerCreated(final ProducerCreated message) {
200 LOG.debug("Received ProducerCreated: {}", message);
201 final ActorRef sender = getSender();
202 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
204 final List<CompletableFuture<Object>> futures = new ArrayList<>();
206 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
207 final ActorSelection actorSelection = actorSystem.actorSelection(address);
209 FutureConverters.toJava(
210 actorContext.executeOperationAsync(
211 actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
212 .toCompletableFuture());
215 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
216 futures.toArray(new CompletableFuture[futures.size()]));
218 combinedFuture.thenRun(() -> {
219 for (final CompletableFuture<Object> future : futures) {
221 final Object result = future.get();
222 if (result instanceof Status.Failure) {
223 sender.tell(result, self());
226 } catch (InterruptedException | ExecutionException e) {
227 sender.tell(new Status.Failure(e), self());
231 sender.tell(new Status.Success(null), noSender());
232 }).exceptionally(throwable -> {
233 sender.tell(new Status.Failure(throwable), self());
238 private void onNotifyProducerCreated(final NotifyProducerCreated message) {
239 LOG.debug("Received NotifyProducerCreated: {}", message);
241 final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
244 final ActorProducerRegistration registration =
245 new ActorProducerRegistration(dataTreeService.createProducer(subtrees), subtrees);
246 subtrees.forEach(id -> idToProducer.put(id, registration));
247 sender().tell(new Status.Success(null), self());
248 } catch (final IllegalArgumentException e) {
249 sender().tell(new Status.Failure(e), getSelf());
253 private void onProducerRemoved(final ProducerRemoved message) {
254 LOG.debug("Received ProducerRemoved: {}", message);
256 final List<CompletableFuture<Object>> futures = new ArrayList<>();
258 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
259 final ActorSelection selection = actorSystem.actorSelection(address);
261 futures.add(FutureConverters.toJava(
262 actorContext.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
263 .toCompletableFuture());
266 final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
267 futures.toArray(new CompletableFuture[futures.size()]));
269 final ActorRef respondTo = getSender();
272 .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
273 .exceptionally(e -> {
274 respondTo.tell(new Status.Failure(null), self());
280 private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
281 LOG.debug("Received NotifyProducerRemoved: {}", message);
283 final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
284 if (registration == null) {
285 LOG.warn("The notification contained a path on which no producer is registered, throwing away");
286 getSender().tell(new Status.Success(null), noSender());
291 registration.close();
292 getSender().tell(new Status.Success(null), noSender());
293 } catch (final DOMDataTreeProducerException e) {
294 LOG.error("Unable to close producer", e);
295 getSender().tell(new Status.Failure(e), noSender());
299 @SuppressWarnings("checkstyle:IllegalCatch")
300 private void onCreatePrefixShard(final CreatePrefixShard message) {
301 LOG.debug("Received CreatePrefixShard: {}", message);
303 final PrefixShardConfiguration configuration = message.getConfiguration();
305 final DOMDataTreeProducer producer =
306 dataTreeService.createProducer(Collections.singleton(configuration.getPrefix()));
308 final DistributedDataStore distributedDataStore =
309 configuration.getPrefix().getDatastoreType() == LogicalDatastoreType.CONFIGURATION
310 ? distributedConfigDatastore : distributedOperDatastore;
311 final String shardName = ClusterUtils.getCleanShardName(configuration.getPrefix().getRootIdentifier());
312 LOG.debug("Creating distributed datastore client for shard {}", shardName);
313 final Props distributedDataStoreClientProps =
314 SimpleDataStoreClientActor.props(cluster.getCurrentMemberName(),
315 "Shard-" + shardName, distributedDataStore.getActorContext(), shardName);
317 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
318 final DataStoreClient client;
320 client = SimpleDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
321 } catch (final Exception e) {
322 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
323 clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
324 throw Throwables.propagate(e);
328 final ListenerRegistration<ShardFrontend> shardFrontendRegistration =
329 shardingService.registerDataTreeShard(configuration.getPrefix(),
332 configuration.getPrefix()
335 idToShardRegistration.put(configuration.getPrefix(),
336 new ShardFrontendRegistration(clientActor, shardFrontendRegistration));
338 sender().tell(new Status.Success(null), self());
339 } catch (final DOMDataTreeShardingConflictException e) {
340 LOG.error("Unable to create shard", e);
341 clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
342 sender().tell(new Status.Failure(e), self());
346 } catch (final DOMDataTreeProducerException e) {
347 LOG.error("Unable to close producer that was used for shard registration {}", producer, e);
352 private void onPrefixShardCreated(final PrefixShardCreated message) {
353 LOG.debug("Received PrefixShardCreated: {}", message);
355 final Collection<String> addresses = resolver.getShardingServicePeerActorAddresses();
356 final ActorRef sender = getSender();
358 final List<CompletableFuture<Object>> futures = new ArrayList<>();
360 for (final String address : addresses) {
361 final ActorSelection actorSelection = actorSystem.actorSelection(address);
362 futures.add(FutureConverters.toJava(actorContext.executeOperationAsync(actorSelection,
363 new CreatePrefixShard(message.getConfiguration()))).toCompletableFuture());
366 final CompletableFuture<Void> combinedFuture =
367 CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
369 combinedFuture.thenRun(() -> {
370 for (final CompletableFuture<Object> future : futures) {
372 final Object result = future.get();
373 if (result instanceof Status.Failure) {
374 sender.tell(result, self());
377 } catch (InterruptedException | ExecutionException e) {
378 sender.tell(new Status.Failure(e), self());
382 sender.tell(new Status.Success(null), self());
383 }).exceptionally(throwable -> {
384 sender.tell(new Status.Failure(throwable), self());
389 private void onRemovePrefixShard(final RemovePrefixShard message) {
390 LOG.debug("Received RemovePrefixShard: {}", message);
392 for (final String address : resolver.getShardingServicePeerActorAddresses()) {
393 final ActorSelection selection = actorContext.actorSelection(address);
394 selection.tell(new PrefixShardRemoved(message.getPrefix()), getSelf());
398 private void onPrefixShardRemoved(final PrefixShardRemoved message) {
399 LOG.debug("Received PrefixShardRemoved: {}", message);
401 final ShardFrontendRegistration registration = idToShardRegistration.get(message.getPrefix());
403 if (registration == null) {
404 LOG.warn("Received shard removed for {}, but not shard registered at this prefix all registrations: {}",
405 message.getPrefix(), idToShardRegistration);
409 registration.close();
412 private static MemberName memberToName(final Member member) {
413 return MemberName.forName(member.roles().iterator().next());
416 private class ActorProducerRegistration {
418 private final DOMDataTreeProducer producer;
419 private final Collection<DOMDataTreeIdentifier> subtrees;
421 ActorProducerRegistration(final DOMDataTreeProducer producer,
422 final Collection<DOMDataTreeIdentifier> subtrees) {
423 this.producer = producer;
424 this.subtrees = subtrees;
427 void close() throws DOMDataTreeProducerException {
429 subtrees.forEach(idToProducer::remove);
433 private static class ShardFrontendRegistration extends
434 AbstractObjectRegistration<ListenerRegistration<ShardFrontend>> {
436 private final ActorRef clientActor;
437 private final ListenerRegistration<ShardFrontend> shardRegistration;
439 ShardFrontendRegistration(final ActorRef clientActor,
440 final ListenerRegistration<ShardFrontend> shardRegistration) {
441 super(shardRegistration);
442 this.clientActor = clientActor;
443 this.shardRegistration = shardRegistration;
447 protected void removeRegistration() {
448 shardRegistration.close();
449 clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
453 public static class ShardedDataTreeActorCreator {
455 private DOMDataTreeService dataTreeService;
456 private DOMDataTreeShardingService shardingService;
457 private DistributedDataStore distributedConfigDatastore;
458 private DistributedDataStore distributedOperDatastore;
459 private ActorSystem actorSystem;
460 private ClusterWrapper cluster;
462 public DOMDataTreeService getDataTreeService() {
463 return dataTreeService;
466 public ShardedDataTreeActorCreator setDataTreeService(final DOMDataTreeService dataTreeService) {
467 this.dataTreeService = dataTreeService;
471 public DOMDataTreeShardingService getShardingService() {
472 return shardingService;
475 public ShardedDataTreeActorCreator setShardingService(final DOMDataTreeShardingService shardingService) {
476 this.shardingService = shardingService;
480 public ActorSystem getActorSystem() {
484 public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
485 this.actorSystem = actorSystem;
489 public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper cluster) {
490 this.cluster = cluster;
494 public ClusterWrapper getClusterWrapper() {
498 public DistributedDataStore getDistributedConfigDatastore() {
499 return distributedConfigDatastore;
502 public ShardedDataTreeActorCreator setDistributedConfigDatastore(
503 final DistributedDataStore distributedConfigDatastore) {
504 this.distributedConfigDatastore = distributedConfigDatastore;
508 public DistributedDataStore getDistributedOperDatastore() {
509 return distributedOperDatastore;
512 public ShardedDataTreeActorCreator setDistributedOperDatastore(
513 final DistributedDataStore distributedOperDatastore) {
514 this.distributedOperDatastore = distributedOperDatastore;
518 private void verify() {
519 Preconditions.checkNotNull(dataTreeService);
520 Preconditions.checkNotNull(shardingService);
521 Preconditions.checkNotNull(actorSystem);
522 Preconditions.checkNotNull(cluster);
523 Preconditions.checkNotNull(distributedConfigDatastore);
524 Preconditions.checkNotNull(distributedOperDatastore);
527 public Props props() {
529 return Props.create(ShardedDataTreeActor.class, this);