2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import static akka.actor.ActorRef.noSender;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.cluster.Cluster;
18 import akka.cluster.Member;
19 import akka.util.Timeout;
20 import com.google.common.base.Preconditions;
21 import com.google.common.base.Throwables;
22 import com.google.common.collect.Collections2;
23 import com.google.common.collect.ForwardingObject;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import java.util.AbstractMap.SimpleEntry;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.EnumMap;
30 import java.util.Map.Entry;
32 import java.util.concurrent.TimeUnit;
33 import javax.annotation.Nonnull;
34 import org.opendaylight.controller.cluster.access.concepts.MemberName;
35 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
36 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
37 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
38 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
39 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
40 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
41 import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
42 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
43 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
44 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
45 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
46 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
47 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
55 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
56 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
57 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
58 import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
59 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
60 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
61 import org.opendaylight.yangtools.concepts.ListenerRegistration;
62 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65 import scala.collection.JavaConverters;
68 * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
69 * {@link ShardedDataTreeActor}. Also provides QoL method for addition of prefix based clustered shard into the system.
71 public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService,
72 DistributedShardFactory {
74 private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class);
76 private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
77 private static final int MAX_ACTOR_CREATION_RETRIES = 100;
78 private static final int ACTOR_RETRY_DELAY = 100;
79 private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
81 static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
83 private final ShardedDOMDataTree shardedDOMDataTree;
84 private final ActorSystem actorSystem;
85 private final DistributedDataStore distributedOperDatastore;
86 private final DistributedDataStore distributedConfigDatastore;
88 private final ActorRef shardedDataTreeActor;
89 private final MemberName memberName;
91 private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shards =
92 DOMDataTreePrefixTable.create();
94 private final EnumMap<LogicalDatastoreType, DistributedShardRegistration> defaultShardRegistrations =
95 new EnumMap<>(LogicalDatastoreType.class);
97 public DistributedShardedDOMDataTree(final ActorSystem actorSystem,
98 final DistributedDataStore distributedOperDatastore,
99 final DistributedDataStore distributedConfigDatastore) {
100 this.actorSystem = Preconditions.checkNotNull(actorSystem);
101 this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore);
102 this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore);
103 shardedDOMDataTree = new ShardedDOMDataTree();
105 shardedDataTreeActor = createShardedDataTreeActor(actorSystem,
106 new ShardedDataTreeActorCreator()
107 .setShardingService(this)
108 .setActorSystem(actorSystem)
109 .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper())
110 .setDistributedConfigDatastore(distributedConfigDatastore)
111 .setDistributedOperDatastore(distributedOperDatastore),
114 this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName();
116 //create shard registration for DEFAULT_SHARD
118 defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
119 initDefaultShard(LogicalDatastoreType.CONFIGURATION));
120 } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) {
121 LOG.error("Unable to create default shard frontend for config shard", e);
125 defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
126 initDefaultShard(LogicalDatastoreType.OPERATIONAL));
127 } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) {
128 LOG.error("Unable to create default shard frontend for operational shard", e);
134 public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(
135 final T listener, final Collection<DOMDataTreeIdentifier> subtrees,
136 final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers)
137 throws DOMDataTreeLoopException {
139 throw new UnsupportedOperationException("Not implemented");
144 public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
145 LOG.debug("{} - Creating producer for {}",
146 distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
147 final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees);
149 final Object response = distributedConfigDatastore.getActorContext()
150 .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees));
151 if (response == null) {
152 LOG.debug("{} - Received success from remote nodes, creating producer:{}",
153 distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
154 return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
155 distributedConfigDatastore.getActorContext());
156 } else if (response instanceof Exception) {
157 closeProducer(producer);
158 throw Throwables.propagate((Exception) response);
160 closeProducer(producer);
161 throw new RuntimeException("Unexpected response to create producer received." + response);
166 @SuppressWarnings("checkstyle:IllegalCatch")
167 //TODO: it would be better to block here until the message is processed by the actor
168 public DistributedShardRegistration createDistributedShard(
169 final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
170 throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException {
171 final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
172 shards.lookup(prefix);
173 if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
174 throw new DOMDataTreeShardingConflictException(
175 "Prefix " + prefix + " is already occupied by another shard.");
178 PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
179 shardedDataTreeActor.tell(new CreatePrefixShard(config), noSender());
181 return new DistributedShardRegistrationImpl(prefix, shardedDataTreeActor, this);
184 void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
185 LOG.debug("Member {}: Resolving additions : {}", memberName, additions);
186 final ArrayList<DOMDataTreeIdentifier> list = new ArrayList<>(additions);
187 // we need to register the shards from top to bottom, so we need to atleast make sure the ordering reflects that
188 Collections.sort(list, (o1, o2) -> {
189 if (o1.getRootIdentifier().getPathArguments().size() < o2.getRootIdentifier().getPathArguments().size()) {
191 } else if (o1.getRootIdentifier().getPathArguments().size()
192 == o2.getRootIdentifier().getPathArguments().size()) {
198 list.forEach(this::createShardFrontend);
201 void resolveShardRemovals(final Set<DOMDataTreeIdentifier> removals) {
202 LOG.debug("Member {}: Resolving removals : {}", memberName, removals);
204 // do we need to go from bottom to top?
205 removals.forEach(this::despawnShardFrontend);
208 private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
209 LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix);
210 final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
211 final DistributedDataStore distributedDataStore =
212 prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
213 ? distributedConfigDatastore : distributedOperDatastore;
215 try (final DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
216 final Entry<DataStoreClient, ActorRef> entry =
217 createDatastoreClient(shardName, distributedDataStore.getActorContext());
219 final DistributedShardFrontend shard =
220 new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix);
222 @SuppressWarnings("unchecked")
223 final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
224 (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
225 shards.store(prefix, reg);
226 } catch (final DOMDataTreeShardingConflictException e) {
227 LOG.error("Prefix {} is already occupied by another shard", prefix, e);
228 } catch (DOMDataTreeProducerException e) {
229 LOG.error("Unable to close producer", e);
230 } catch (DOMDataTreeShardCreationFailedException e) {
231 LOG.error("Unable to create datastore client for shard {}", prefix, e);
235 private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) {
236 LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix);
237 final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
238 shards.lookup(prefix);
240 if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) {
241 LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
246 lookup.getValue().close();
247 // need to remove from our local table thats used for tracking
248 shards.remove(prefix);
251 DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
252 return shardedDOMDataTree.createProducer(prefix);
257 public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
258 @Nonnull final DOMDataTreeIdentifier prefix,
259 @Nonnull final T shard,
260 @Nonnull final DOMDataTreeProducer producer)
261 throws DOMDataTreeShardingConflictException {
263 LOG.debug("Registering shard[{}] at prefix: {}", shard, prefix);
265 return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
268 @SuppressWarnings("checkstyle:IllegalCatch")
269 private Entry<DataStoreClient, ActorRef> createDatastoreClient(
270 final String shardName, final ActorContext actorContext)
271 throws DOMDataTreeShardCreationFailedException {
273 LOG.debug("Creating distributed datastore client for shard {}", shardName);
274 final Props distributedDataStoreClientProps =
275 SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorContext, shardName);
277 final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
279 return new SimpleEntry<>(SimpleDataStoreClientActor
280 .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor);
281 } catch (final Exception e) {
282 LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
283 clientActor.tell(PoisonPill.getInstance(), noSender());
284 throw new DOMDataTreeShardCreationFailedException(
285 "Unable to create datastore client for shard{" + shardName + "}", e);
289 private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
290 throws DOMDataTreeProducerException, DOMDataTreeShardingConflictException {
291 final Collection<Member> members = JavaConverters.asJavaCollectionConverter(
292 Cluster.get(actorSystem).state().members()).asJavaCollection();
293 final Collection<MemberName> names = Collections2.transform(members,
294 m -> MemberName.forName(m.roles().iterator().next()));
296 return createDistributedShard(
297 new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names);
300 private static void closeProducer(final DOMDataTreeProducer producer) {
303 } catch (final DOMDataTreeProducerException e) {
304 LOG.error("Unable to close producer", e);
308 @SuppressWarnings("checkstyle:IllegalCatch")
309 private static ActorRef createShardedDataTreeActor(final ActorSystem actorSystem,
310 final ShardedDataTreeActorCreator creator,
311 final String shardDataTreeActorId) {
312 Exception lastException = null;
314 for (int i = 0; i < MAX_ACTOR_CREATION_RETRIES; i++) {
316 return actorSystem.actorOf(creator.props(), shardDataTreeActorId);
317 } catch (final Exception e) {
319 Uninterruptibles.sleepUninterruptibly(ACTOR_RETRY_DELAY, ACTOR_RETRY_TIME_UNIT);
320 LOG.debug("Could not create actor {} because of {} -"
321 + " waiting for sometime before retrying (retry count = {})",
322 shardDataTreeActorId, e.getMessage(), i);
326 throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException);
329 private static class DistributedShardRegistrationImpl implements DistributedShardRegistration {
331 private final DOMDataTreeIdentifier prefix;
332 private final ActorRef shardedDataTreeActor;
333 private final DistributedShardedDOMDataTree distributedShardedDOMDataTree;
335 DistributedShardRegistrationImpl(final DOMDataTreeIdentifier prefix,
336 final ActorRef shardedDataTreeActor,
337 final DistributedShardedDOMDataTree distributedShardedDOMDataTree) {
338 this.prefix = prefix;
339 this.shardedDataTreeActor = shardedDataTreeActor;
340 this.distributedShardedDOMDataTree = distributedShardedDOMDataTree;
344 public void close() {
345 // first despawn on the local node
346 distributedShardedDOMDataTree.despawnShardFrontend(prefix);
347 // update the config so the remote nodes are updated
348 shardedDataTreeActor.tell(new RemovePrefixShard(prefix), noSender());
352 private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer {
354 private final DOMDataTreeProducer delegate;
355 private final Collection<DOMDataTreeIdentifier> subtrees;
356 private final ActorRef shardDataTreeActor;
357 private final ActorContext actorContext;
359 ProxyProducer(final DOMDataTreeProducer delegate,
360 final Collection<DOMDataTreeIdentifier> subtrees,
361 final ActorRef shardDataTreeActor,
362 final ActorContext actorContext) {
363 this.delegate = Preconditions.checkNotNull(delegate);
364 this.subtrees = Preconditions.checkNotNull(subtrees);
365 this.shardDataTreeActor = Preconditions.checkNotNull(shardDataTreeActor);
366 this.actorContext = Preconditions.checkNotNull(actorContext);
371 public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) {
372 return delegate.createTransaction(isolated);
377 public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
378 // TODO we probably don't need to distribute this on the remote nodes since once we have this producer
379 // open we surely have the rights to all the subtrees.
380 return delegate.createProducer(subtrees);
384 public void close() throws DOMDataTreeProducerException {
387 final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
388 if (o instanceof DOMDataTreeProducerException) {
389 throw ((DOMDataTreeProducerException) o);
390 } else if (o instanceof Throwable) {
391 throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o);
396 protected DOMDataTreeProducer delegate() {