2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.cluster.Cluster;
18 import akka.cluster.Member;
19 import akka.dispatch.Mapper;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Throwables;
24 import com.google.common.collect.Collections2;
25 import com.google.common.collect.ForwardingObject;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.AbstractMap.SimpleEntry;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.EnumMap;
32 import java.util.Map.Entry;
34 import java.util.concurrent.CompletionStage;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.TimeUnit;
37 import javax.annotation.Nonnull;
38 import org.opendaylight.controller.cluster.ActorSystemProvider;
39 import org.opendaylight.controller.cluster.access.concepts.MemberName;
40 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
41 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
42 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
43 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
44 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
45 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
46 import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
47 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
48 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
49 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
50 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
51 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
55 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
56 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
62 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
63 import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
64 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
65 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
66 import org.opendaylight.yangtools.concepts.ListenerRegistration;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import scala.collection.JavaConverters;
71 import scala.compat.java8.FutureConverters;
72 import scala.concurrent.Future;
73 import scala.concurrent.duration.FiniteDuration;
76 * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
77 * {@link ShardedDataTreeActor}. Also provides QoL method for addition of prefix based clustered shard into the system.
79 public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService,
80 DistributedShardFactory {
82 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class);
84 private static final int MAX_ACTOR_CREATION_RETRIES = 100;
85 private static final int ACTOR_RETRY_DELAY = 100;
86 private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
87 static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION = new FiniteDuration(
88 ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * 3,
90 static final Timeout SHARD_FUTURE_TIMEOUT = new Timeout(SHARD_FUTURE_TIMEOUT_DURATION);
92 static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
94 private final ShardedDOMDataTree shardedDOMDataTree;
95 private final ActorSystem actorSystem;
96 private final DistributedDataStore distributedOperDatastore;
97 private final DistributedDataStore distributedConfigDatastore;
99 private final ActorRef shardedDataTreeActor;
100 private final MemberName memberName;
102 private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shards =
103 DOMDataTreePrefixTable.create();
105 private final EnumMap<LogicalDatastoreType, DistributedShardRegistration> defaultShardRegistrations =
106 new EnumMap<>(LogicalDatastoreType.class);
108 public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider,
109 final DistributedDataStore distributedOperDatastore,
110 final DistributedDataStore distributedConfigDatastore) {
111 this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
112 this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore);
113 this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore);
114 shardedDOMDataTree = new ShardedDOMDataTree();
116 shardedDataTreeActor = createShardedDataTreeActor(actorSystem,
117 new ShardedDataTreeActorCreator()
118 .setShardingService(this)
119 .setActorSystem(actorSystem)
120 .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper())
121 .setDistributedConfigDatastore(distributedConfigDatastore)
122 .setDistributedOperDatastore(distributedOperDatastore),
125 this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName();
127 //create shard registration for DEFAULT_SHARD
129 defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
130 initDefaultShard(LogicalDatastoreType.CONFIGURATION));
131 } catch (final InterruptedException | ExecutionException e) {
132 LOG.error("Unable to create default shard frontend for config shard", e);
136 defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
137 initDefaultShard(LogicalDatastoreType.OPERATIONAL));
138 } catch (final InterruptedException | ExecutionException e) {
139 LOG.error("Unable to create default shard frontend for operational shard", e);
145 public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(
146 final T listener, final Collection<DOMDataTreeIdentifier> subtrees,
147 final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers)
148 throws DOMDataTreeLoopException {
149 return shardedDOMDataTree.registerListener(listener, subtrees, allowRxMerges, producers);
154 public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
155 LOG.debug("{} - Creating producer for {}",
156 distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
157 final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees);
159 final Object response = distributedConfigDatastore.getActorContext()
160 .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees));
161 if (response == null) {
162 LOG.debug("{} - Received success from remote nodes, creating producer:{}",
163 distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
164 return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
165 distributedConfigDatastore.getActorContext());
166 } else if (response instanceof Exception) {
167 closeProducer(producer);
168 throw Throwables.propagate((Exception) response);
170 closeProducer(producer);
171 throw new RuntimeException("Unexpected response to create producer received." + response);
176 public CompletionStage<DistributedShardRegistration> createDistributedShard(
177 final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
178 throws DOMDataTreeShardingConflictException {
179 final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
180 shards.lookup(prefix);
181 if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
182 throw new DOMDataTreeShardingConflictException(
183 "Prefix " + prefix + " is already occupied by another shard.");
186 final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
188 final Future<Object> ask =
189 Patterns.ask(shardedDataTreeActor, new CreatePrefixShard(config), SHARD_FUTURE_TIMEOUT);
191 final Future<DistributedShardRegistration> shardRegistrationFuture = ask.transform(
192 new Mapper<Object, DistributedShardRegistration>() {
194 public DistributedShardRegistration apply(final Object parameter) {
195 return new DistributedShardRegistrationImpl(
196 prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this);
199 new Mapper<Throwable, Throwable>() {
201 public Throwable apply(final Throwable throwable) {
202 return new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable);
204 }, actorSystem.dispatcher());
206 return FutureConverters.toJava(shardRegistrationFuture);
209 void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
210 LOG.debug("Member {}: Resolving additions : {}", memberName, additions);
211 final ArrayList<DOMDataTreeIdentifier> list = new ArrayList<>(additions);
212 // we need to register the shards from top to bottom, so we need to atleast make sure the ordering reflects that
213 Collections.sort(list, (o1, o2) -> {
214 if (o1.getRootIdentifier().getPathArguments().size() < o2.getRootIdentifier().getPathArguments().size()) {
216 } else if (o1.getRootIdentifier().getPathArguments().size()
217 == o2.getRootIdentifier().getPathArguments().size()) {
223 list.forEach(this::createShardFrontend);
226 void resolveShardRemovals(final Set<DOMDataTreeIdentifier> removals) {
227 LOG.debug("Member {}: Resolving removals : {}", memberName, removals);
229 // do we need to go from bottom to top?
230 removals.forEach(this::despawnShardFrontend);
233 private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
234 LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix);
235 final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
236 final DistributedDataStore distributedDataStore =
237 prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
238 ? distributedConfigDatastore : distributedOperDatastore;
240 try (final DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
241 final Entry<DataStoreClient, ActorRef> entry =
242 createDatastoreClient(shardName, distributedDataStore.getActorContext());
244 final DistributedShardFrontend shard =
245 new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix);
247 @SuppressWarnings("unchecked")
248 final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
249 (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
250 shards.store(prefix, reg);
251 } catch (final DOMDataTreeShardingConflictException e) {
252 LOG.error("Prefix {} is already occupied by another shard", prefix, e);
253 } catch (DOMDataTreeProducerException e) {
254 LOG.error("Unable to close producer", e);
255 } catch (DOMDataTreeShardCreationFailedException e) {
256 LOG.error("Unable to create datastore client for shard {}", prefix, e);
260 private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) {
261 LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix);
262 final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
263 shards.lookup(prefix);
265 if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) {
266 LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
271 lookup.getValue().close();
272 // need to remove from our local table thats used for tracking
273 shards.remove(prefix);
276 DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
277 final DOMDataTreeIdentifier prefix) {
278 return shards.lookup(prefix);
282 DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
283 return shardedDOMDataTree.createProducer(prefix);
288 public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
289 @Nonnull final DOMDataTreeIdentifier prefix,
290 @Nonnull final T shard,
291 @Nonnull final DOMDataTreeProducer producer)
292 throws DOMDataTreeShardingConflictException {
294 LOG.debug("Registering shard[{}] at prefix: {}", shard, prefix);
296 return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
299 @SuppressWarnings("checkstyle:IllegalCatch")
300 private Entry<DataStoreClient, ActorRef> createDatastoreClient(
301 final String shardName, final ActorContext actorContext)
302 throws DOMDataTreeShardCreationFailedException {
304 LOG.debug("Creating distributed datastore client for shard {}", shardName);
305 final Props distributedDataStoreClientProps =
306 SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorContext, shardName);
308 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
310 return new SimpleEntry<>(SimpleDataStoreClientActor
311 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor);
312 } catch (final Exception e) {
313 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
314 clientActor.tell(PoisonPill.getInstance(), noSender());
315 throw new DOMDataTreeShardCreationFailedException(
316 "Unable to create datastore client for shard{" + shardName + "}", e);
320 private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
321 throws ExecutionException, InterruptedException {
322 final Collection<Member> members = JavaConverters.asJavaCollectionConverter(
323 Cluster.get(actorSystem).state().members()).asJavaCollection();
324 final Collection<MemberName> names = Collections2.transform(members,
325 m -> MemberName.forName(m.roles().iterator().next()));
328 // we should probably only have one node create the default shards
329 return createDistributedShard(
330 new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)
331 .toCompletableFuture().get();
332 } catch (DOMDataTreeShardingConflictException e) {
333 LOG.debug("Default shard already registered, possibly due to other node doing it faster");
334 return new DistributedShardRegistrationImpl(
335 new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
336 shardedDataTreeActor, this);
340 private static void closeProducer(final DOMDataTreeProducer producer) {
343 } catch (final DOMDataTreeProducerException e) {
344 LOG.error("Unable to close producer", e);
348 @SuppressWarnings("checkstyle:IllegalCatch")
349 private static ActorRef createShardedDataTreeActor(final ActorSystem actorSystem,
350 final ShardedDataTreeActorCreator creator,
351 final String shardDataTreeActorId) {
352 Exception lastException = null;
354 for (int i = 0; i < MAX_ACTOR_CREATION_RETRIES; i++) {
356 return actorSystem.actorOf(creator.props(), shardDataTreeActorId);
357 } catch (final Exception e) {
359 Uninterruptibles.sleepUninterruptibly(ACTOR_RETRY_DELAY, ACTOR_RETRY_TIME_UNIT);
360 LOG.debug("Could not create actor {} because of {} -"
361 + " waiting for sometime before retrying (retry count = {})",
362 shardDataTreeActorId, e.getMessage(), i);
366 throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException);
369 private class DistributedShardRegistrationImpl implements DistributedShardRegistration {
371 private final DOMDataTreeIdentifier prefix;
372 private final ActorRef shardedDataTreeActor;
373 private final DistributedShardedDOMDataTree distributedShardedDOMDataTree;
375 DistributedShardRegistrationImpl(final DOMDataTreeIdentifier prefix,
376 final ActorRef shardedDataTreeActor,
377 final DistributedShardedDOMDataTree distributedShardedDOMDataTree) {
378 this.prefix = prefix;
379 this.shardedDataTreeActor = shardedDataTreeActor;
380 this.distributedShardedDOMDataTree = distributedShardedDOMDataTree;
384 public CompletionStage<Void> close() {
385 // first despawn on the local node
386 distributedShardedDOMDataTree.despawnShardFrontend(prefix);
387 // update the config so the remote nodes are updated
388 final Future<Object> ask =
389 Patterns.ask(shardedDataTreeActor, new RemovePrefixShard(prefix), SHARD_FUTURE_TIMEOUT);
391 final Future<Void> closeFuture = ask.transform(
392 new Mapper<Object, Void>() {
394 public Void apply(Object parameter) {
398 new Mapper<Throwable, Throwable>() {
400 public Throwable apply(Throwable throwable) {
403 }, actorSystem.dispatcher());
405 return FutureConverters.toJava(closeFuture);
409 private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer {
411 private final DOMDataTreeProducer delegate;
412 private final Collection<DOMDataTreeIdentifier> subtrees;
413 private final ActorRef shardDataTreeActor;
414 private final ActorContext actorContext;
416 ProxyProducer(final DOMDataTreeProducer delegate,
417 final Collection<DOMDataTreeIdentifier> subtrees,
418 final ActorRef shardDataTreeActor,
419 final ActorContext actorContext) {
420 this.delegate = Preconditions.checkNotNull(delegate);
421 this.subtrees = Preconditions.checkNotNull(subtrees);
422 this.shardDataTreeActor = Preconditions.checkNotNull(shardDataTreeActor);
423 this.actorContext = Preconditions.checkNotNull(actorContext);
428 public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) {
429 return delegate.createTransaction(isolated);
434 public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
435 // TODO we probably don't need to distribute this on the remote nodes since once we have this producer
436 // open we surely have the rights to all the subtrees.
437 return delegate.createProducer(subtrees);
441 public void close() throws DOMDataTreeProducerException {
444 final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
445 if (o instanceof DOMDataTreeProducerException) {
446 throw ((DOMDataTreeProducerException) o);
447 } else if (o instanceof Throwable) {
448 throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o);
453 protected DOMDataTreeProducer delegate() {