BUG-8792: allow transactions to not time out after reconnect
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardedDOMDataTree.java
1 /*
2  * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.dispatch.Mapper;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.Patterns;
20 import akka.util.Timeout;
21 import com.google.common.base.Optional;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Throwables;
24 import com.google.common.collect.ForwardingObject;
25 import com.google.common.util.concurrent.FutureCallback;
26 import com.google.common.util.concurrent.Futures;
27 import com.google.common.util.concurrent.ListenableFuture;
28 import com.google.common.util.concurrent.MoreExecutors;
29 import com.google.common.util.concurrent.SettableFuture;
30 import com.google.common.util.concurrent.Uninterruptibles;
31 import java.util.AbstractMap.SimpleEntry;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Collections;
35 import java.util.EnumMap;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Map.Entry;
40 import java.util.Set;
41 import java.util.concurrent.CompletionStage;
42 import java.util.concurrent.ExecutionException;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.TimeoutException;
45 import javax.annotation.Nonnull;
46 import javax.annotation.Nullable;
47 import javax.annotation.concurrent.GuardedBy;
48 import org.opendaylight.controller.cluster.ActorSystemProvider;
49 import org.opendaylight.controller.cluster.access.concepts.MemberName;
50 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
51 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
52 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
53 import org.opendaylight.controller.cluster.datastore.Shard;
54 import org.opendaylight.controller.cluster.datastore.config.Configuration;
55 import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
56 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
57 import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
58 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
59 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
60 import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer;
61 import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
62 import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
63 import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
64 import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
65 import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
66 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
67 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
68 import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
69 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
70 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
71 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
72 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
73 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
74 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
75 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
76 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
77 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
78 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
79 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
80 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
81 import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
82 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
83 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
84 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.prefix.shard.configuration.rev170110.PrefixShards;
85 import org.opendaylight.yangtools.concepts.ListenerRegistration;
86 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
87 import org.slf4j.Logger;
88 import org.slf4j.LoggerFactory;
89 import scala.compat.java8.FutureConverters;
90 import scala.concurrent.Await;
91 import scala.concurrent.Future;
92 import scala.concurrent.Promise;
93 import scala.concurrent.duration.FiniteDuration;
94
95 /**
96  * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
97  * {@link ShardedDataTreeActor}. Also provides QoL method for addition of prefix based clustered shard into the system.
98  */
99 public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService,
100         DistributedShardFactory {
101
102     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class);
103
104     private static final int MAX_ACTOR_CREATION_RETRIES = 100;
105     private static final int ACTOR_RETRY_DELAY = 100;
106     private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
107     private static final int LOOKUP_TASK_MAX_RETRIES = 100;
108     static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION =
109             new FiniteDuration(LOOKUP_TASK_MAX_RETRIES * LOOKUP_TASK_MAX_RETRIES * 3, TimeUnit.SECONDS);
110     static final Timeout SHARD_FUTURE_TIMEOUT = new Timeout(SHARD_FUTURE_TIMEOUT_DURATION);
111
112     static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
113
114     private final ShardedDOMDataTree shardedDOMDataTree;
115     private final ActorSystem actorSystem;
116     private final AbstractDataStore distributedOperDatastore;
117     private final AbstractDataStore distributedConfigDatastore;
118
119     private final ActorRef shardedDataTreeActor;
120     private final MemberName memberName;
121
122     @GuardedBy("shards")
123     private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shards =
124             DOMDataTreePrefixTable.create();
125
126     private final EnumMap<LogicalDatastoreType, DistributedShardRegistration> defaultShardRegistrations =
127             new EnumMap<>(LogicalDatastoreType.class);
128
129     private final EnumMap<LogicalDatastoreType, Entry<DataStoreClient, ActorRef>> configurationShardMap =
130             new EnumMap<>(LogicalDatastoreType.class);
131
132     private final EnumMap<LogicalDatastoreType, PrefixedShardConfigWriter> writerMap =
133             new EnumMap<>(LogicalDatastoreType.class);
134
135     private final PrefixedShardConfigUpdateHandler updateHandler;
136
137     public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider,
138                                          final AbstractDataStore distributedOperDatastore,
139                                          final AbstractDataStore distributedConfigDatastore) {
140         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
141         this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore);
142         this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore);
143         shardedDOMDataTree = new ShardedDOMDataTree();
144
145         shardedDataTreeActor = createShardedDataTreeActor(actorSystem,
146                 new ShardedDataTreeActorCreator()
147                         .setShardingService(this)
148                         .setActorSystem(actorSystem)
149                         .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper())
150                         .setDistributedConfigDatastore(distributedConfigDatastore)
151                         .setDistributedOperDatastore(distributedOperDatastore)
152                         .setLookupTaskMaxRetries(LOOKUP_TASK_MAX_RETRIES),
153                 ACTOR_ID);
154
155         this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName();
156
157         updateHandler = new PrefixedShardConfigUpdateHandler(shardedDataTreeActor,
158                 distributedConfigDatastore.getActorContext().getCurrentMemberName());
159
160         LOG.debug("{} - Starting prefix configuration shards", memberName);
161         createPrefixConfigShard(distributedConfigDatastore);
162         createPrefixConfigShard(distributedOperDatastore);
163     }
164
165     private static void createPrefixConfigShard(final AbstractDataStore dataStore) {
166         Configuration configuration = dataStore.getActorContext().getConfiguration();
167         Collection<MemberName> memberNames = configuration.getUniqueMemberNamesForAllShards();
168         CreateShard createShardMessage =
169                 new CreateShard(new ModuleShardConfiguration(PrefixShards.QNAME.getNamespace(),
170                         "prefix-shard-configuration", ClusterUtils.PREFIX_CONFIG_SHARD_ID, ModuleShardStrategy.NAME,
171                         memberNames),
172                         Shard.builder(), dataStore.getActorContext().getDatastoreContext());
173
174         dataStore.getActorContext().getShardManager().tell(createShardMessage, noSender());
175     }
176
177     /**
178      * This will try to initialize prefix configuration shards upon their
179      * successful start. We need to create writers to these shards, so we can
180      * satisfy future {@link #createDistributedShard} and
181      * {@link #resolveShardAdditions} requests and update prefix configuration
182      * shards accordingly.
183      *
184      * <p>
185      * We also need to initialize listeners on these shards, so we can react
186      * on changes made on them by other cluster members or even by ourselves.
187      *
188      * <p>
189      * Finally, we need to be sure that default shards for both operational and
190      * configuration data stores are up and running and we have distributed
191      * shards frontend created for them.
192      *
193      * <p>
194      * This is intended to be invoked by blueprint as initialization method.
195      */
196     public void init() {
197         // create our writers to the configuration
198         try {
199             LOG.debug("{} - starting config shard lookup.",
200                     distributedConfigDatastore.getActorContext().getCurrentMemberName());
201
202             // We have to wait for prefix config shards to be up and running
203             // so we can create datastore clients for them
204             handleConfigShardLookup().get(SHARD_FUTURE_TIMEOUT_DURATION.length(), SHARD_FUTURE_TIMEOUT_DURATION.unit());
205
206             LOG.debug("Prefix configuration shards ready - creating clients");
207
208         } catch (InterruptedException | ExecutionException | TimeoutException e) {
209             throw new IllegalStateException("Prefix config shards not found", e);
210         }
211
212         try {
213             LOG.debug("Prefix configuration shards ready - creating clients");
214             configurationShardMap.put(LogicalDatastoreType.CONFIGURATION,
215                     createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID,
216                             distributedConfigDatastore.getActorContext()));
217         } catch (final DOMDataTreeShardCreationFailedException e) {
218             throw new IllegalStateException(
219                     "Unable to create datastoreClient for config DS prefix configuration shard.", e);
220         }
221
222         try {
223             configurationShardMap.put(LogicalDatastoreType.OPERATIONAL,
224                     createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID,
225                             distributedOperDatastore.getActorContext()));
226
227         } catch (final DOMDataTreeShardCreationFailedException e) {
228             throw new IllegalStateException(
229                         "Unable to create datastoreClient for oper DS prefix configuration shard.", e);
230         }
231
232         writerMap.put(LogicalDatastoreType.CONFIGURATION, new PrefixedShardConfigWriter(
233                 configurationShardMap.get(LogicalDatastoreType.CONFIGURATION).getKey()));
234
235         writerMap.put(LogicalDatastoreType.OPERATIONAL, new PrefixedShardConfigWriter(
236                 configurationShardMap.get(LogicalDatastoreType.OPERATIONAL).getKey()));
237
238         updateHandler.initListener(distributedConfigDatastore, LogicalDatastoreType.CONFIGURATION);
239         updateHandler.initListener(distributedOperDatastore, LogicalDatastoreType.OPERATIONAL);
240
241         distributedConfigDatastore.getActorContext().getShardManager().tell(InitConfigListener.INSTANCE, noSender());
242         distributedOperDatastore.getActorContext().getShardManager().tell(InitConfigListener.INSTANCE, noSender());
243
244
245         //create shard registration for DEFAULT_SHARD
246         try {
247             defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
248                     initDefaultShard(LogicalDatastoreType.CONFIGURATION));
249         } catch (final InterruptedException | ExecutionException e) {
250             throw new IllegalStateException("Unable to create default shard frontend for config shard", e);
251         }
252
253         try {
254             defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
255                     initDefaultShard(LogicalDatastoreType.OPERATIONAL));
256         } catch (final InterruptedException | ExecutionException e) {
257             throw new IllegalStateException("Unable to create default shard frontend for operational shard", e);
258         }
259     }
260
261     private ListenableFuture<List<Void>> handleConfigShardLookup() {
262
263         final ListenableFuture<Void> configFuture = lookupConfigShard(LogicalDatastoreType.CONFIGURATION);
264         final ListenableFuture<Void> operFuture = lookupConfigShard(LogicalDatastoreType.OPERATIONAL);
265
266         return Futures.allAsList(configFuture, operFuture);
267     }
268
269     private ListenableFuture<Void> lookupConfigShard(final LogicalDatastoreType type) {
270         final SettableFuture<Void> future = SettableFuture.create();
271
272         final Future<Object> ask =
273                 Patterns.ask(shardedDataTreeActor, new StartConfigShardLookup(type), SHARD_FUTURE_TIMEOUT);
274
275         ask.onComplete(new OnComplete<Object>() {
276             @Override
277             public void onComplete(final Throwable throwable, final Object result) throws Throwable {
278                 if (throwable != null) {
279                     future.setException(throwable);
280                 } else {
281                     future.set(null);
282                 }
283             }
284         }, actorSystem.dispatcher());
285
286         return future;
287     }
288
289     @Nonnull
290     @Override
291     public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(
292             final T listener, final Collection<DOMDataTreeIdentifier> subtrees,
293             final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers)
294             throws DOMDataTreeLoopException {
295         return shardedDOMDataTree.registerListener(listener, subtrees, allowRxMerges, producers);
296     }
297
298     @Nonnull
299     @Override
300     public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
301         LOG.debug("{} - Creating producer for {}",
302                 distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
303         final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees);
304
305         final Object response = distributedConfigDatastore.getActorContext()
306                 .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees));
307         if (response == null) {
308             LOG.debug("{} - Received success from remote nodes, creating producer:{}",
309                     distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
310             return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
311                     distributedConfigDatastore.getActorContext(), shards);
312         }
313
314         closeProducer(producer);
315
316         if (response instanceof Throwable) {
317             Throwables.throwIfUnchecked((Throwable) response);
318             throw new RuntimeException((Throwable) response);
319         }
320         throw new RuntimeException("Unexpected response to create producer received." + response);
321     }
322
323     @Override
324     public CompletionStage<DistributedShardRegistration> createDistributedShard(
325             final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
326             throws DOMDataTreeShardingConflictException {
327
328         synchronized (shards) {
329             final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
330                     shards.lookup(prefix);
331             if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
332                 throw new DOMDataTreeShardingConflictException(
333                         "Prefix " + prefix + " is already occupied by another shard.");
334             }
335         }
336
337         final PrefixedShardConfigWriter writer = writerMap.get(prefix.getDatastoreType());
338
339         final ListenableFuture<Void> writeFuture =
340                 writer.writeConfig(prefix.getRootIdentifier(), replicaMembers);
341
342         final Promise<DistributedShardRegistration> shardRegistrationPromise = akka.dispatch.Futures.promise();
343         Futures.addCallback(writeFuture, new FutureCallback<Void>() {
344             @Override
345             public void onSuccess(@Nullable final Void result) {
346
347                 final Future<Object> ask =
348                         Patterns.ask(shardedDataTreeActor, new LookupPrefixShard(prefix), SHARD_FUTURE_TIMEOUT);
349
350                 shardRegistrationPromise.completeWith(ask.transform(
351                         new Mapper<Object, DistributedShardRegistration>() {
352                             @Override
353                             public DistributedShardRegistration apply(final Object parameter) {
354                                 return new DistributedShardRegistrationImpl(
355                                         prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this);
356                             }
357                         },
358                         new Mapper<Throwable, Throwable>() {
359                             @Override
360                             public Throwable apply(final Throwable throwable) {
361                                 return new DOMDataTreeShardCreationFailedException(
362                                         "Unable to create a cds shard.", throwable);
363                             }
364                         }, actorSystem.dispatcher()));
365             }
366
367             @Override
368             public void onFailure(final Throwable throwable) {
369                 shardRegistrationPromise.failure(
370                         new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable));
371             }
372         }, MoreExecutors.directExecutor());
373
374         return FutureConverters.toJava(shardRegistrationPromise.future());
375     }
376
377     void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
378         LOG.debug("Member {}: Resolving additions : {}", memberName, additions);
379         final ArrayList<DOMDataTreeIdentifier> list = new ArrayList<>(additions);
380         // we need to register the shards from top to bottom, so we need to atleast make sure the ordering reflects that
381         Collections.sort(list, (o1, o2) -> {
382             if (o1.getRootIdentifier().getPathArguments().size() < o2.getRootIdentifier().getPathArguments().size()) {
383                 return -1;
384             } else if (o1.getRootIdentifier().getPathArguments().size()
385                     == o2.getRootIdentifier().getPathArguments().size()) {
386                 return 0;
387             } else {
388                 return 1;
389             }
390         });
391         list.forEach(this::createShardFrontend);
392     }
393
394     void resolveShardRemovals(final Set<DOMDataTreeIdentifier> removals) {
395         LOG.debug("Member {}: Resolving removals : {}", memberName, removals);
396
397         // do we need to go from bottom to top?
398         removals.forEach(this::despawnShardFrontend);
399     }
400
401     private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
402         LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix);
403         final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
404         final AbstractDataStore distributedDataStore =
405                 prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
406                         ? distributedConfigDatastore : distributedOperDatastore;
407
408         try (DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
409             final Entry<DataStoreClient, ActorRef> entry =
410                     createDatastoreClient(shardName, distributedDataStore.getActorContext());
411
412             final DistributedShardFrontend shard =
413                     new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix);
414
415             final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
416                     shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
417
418             synchronized (shards) {
419                 shards.store(prefix, reg);
420             }
421
422         } catch (final DOMDataTreeShardingConflictException e) {
423             LOG.error("{}: Prefix {} is already occupied by another shard",
424                     distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), prefix, e);
425         } catch (DOMDataTreeProducerException e) {
426             LOG.error("Unable to close producer", e);
427         } catch (DOMDataTreeShardCreationFailedException e) {
428             LOG.error("Unable to create datastore client for shard {}", prefix, e);
429         }
430     }
431
432     private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) {
433         LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix);
434         final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup;
435         synchronized (shards) {
436             lookup = shards.lookup(prefix);
437         }
438
439         if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) {
440             LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
441                     memberName, prefix);
442             return;
443         }
444
445         lookup.getValue().close();
446         // need to remove from our local table thats used for tracking
447         synchronized (shards) {
448             shards.remove(prefix);
449         }
450
451         final PrefixedShardConfigWriter writer = writerMap.get(prefix.getDatastoreType());
452         final ListenableFuture<Void> future = writer.removeConfig(prefix.getRootIdentifier());
453
454         Futures.addCallback(future, new FutureCallback<Void>() {
455             @Override
456             public void onSuccess(@Nullable final Void result) {
457                 LOG.debug("{} - Succesfuly removed shard for {}", memberName, prefix);
458             }
459
460             @Override
461             public void onFailure(final Throwable throwable) {
462                 LOG.error("Removal of shard {} from configuration failed.", prefix, throwable);
463             }
464         }, MoreExecutors.directExecutor());
465     }
466
467     DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
468             final DOMDataTreeIdentifier prefix) {
469         synchronized (shards) {
470             return shards.lookup(prefix);
471         }
472     }
473
474     DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
475         return shardedDOMDataTree.createProducer(prefix);
476     }
477
478     @Nonnull
479     @Override
480     public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
481             @Nonnull final DOMDataTreeIdentifier prefix,
482             @Nonnull final T shard,
483             @Nonnull final DOMDataTreeProducer producer)
484             throws DOMDataTreeShardingConflictException {
485
486         LOG.debug("Registering shard[{}] at prefix: {}", shard, prefix);
487
488         if (producer instanceof ProxyProducer) {
489             return shardedDOMDataTree.registerDataTreeShard(prefix, shard, ((ProxyProducer) producer).delegate());
490         }
491
492         return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
493     }
494
495     @SuppressWarnings("checkstyle:IllegalCatch")
496     private Entry<DataStoreClient, ActorRef> createDatastoreClient(
497             final String shardName, final ActorContext actorContext)
498             throws DOMDataTreeShardCreationFailedException {
499
500         LOG.debug("Creating distributed datastore client for shard {}", shardName);
501         final Props distributedDataStoreClientProps =
502                 SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorContext, shardName);
503
504         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
505         try {
506             return new SimpleEntry<>(SimpleDataStoreClientActor
507                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor);
508         } catch (final Exception e) {
509             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
510             clientActor.tell(PoisonPill.getInstance(), noSender());
511             throw new DOMDataTreeShardCreationFailedException(
512                     "Unable to create datastore client for shard{" + shardName + "}", e);
513         }
514     }
515
516     @SuppressWarnings("checkstyle:IllegalCatch")
517     private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
518             throws ExecutionException, InterruptedException {
519         final Collection<MemberName> names =
520                 distributedConfigDatastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards();
521
522         final PrefixedShardConfigWriter writer = writerMap.get(logicalDatastoreType);
523
524         if (writer.checkDefaultIsPresent()) {
525             LOG.debug("Default shard for {} is already present in the config. Possibly saved in snapshot.",
526                     logicalDatastoreType);
527             return new DistributedShardRegistrationImpl(
528                     new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
529                     shardedDataTreeActor, this);
530         } else {
531             try {
532                 // There can be situation when there is already started default shard
533                 // because it is present in modules.conf. In that case we have to create
534                 // just frontend for default shard, but not shard itself
535                 // TODO we don't have to do it for config and operational default shard
536                 // separately. Just one of them should be enough
537                 final ActorContext actorContext = logicalDatastoreType == LogicalDatastoreType.CONFIGURATION
538                         ? distributedConfigDatastore.getActorContext() : distributedOperDatastore.getActorContext();
539
540                 final Optional<ActorRef> defaultLocalShardOptional =
541                         actorContext.findLocalShard(ClusterUtils.getCleanShardName(YangInstanceIdentifier.EMPTY));
542
543                 if (defaultLocalShardOptional.isPresent()) {
544                     LOG.debug("{} Default shard is already started, creating just frontend", logicalDatastoreType);
545                     createShardFrontend(new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY));
546                     return new DistributedShardRegistrationImpl(
547                             new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
548                             shardedDataTreeActor, this);
549                 }
550
551                 // we should probably only have one node create the default shards
552                 return Await.result(FutureConverters.toScala(createDistributedShard(
553                         new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)),
554                         SHARD_FUTURE_TIMEOUT_DURATION);
555             } catch (DOMDataTreeShardingConflictException e) {
556                 LOG.debug("Default shard already registered, possibly due to other node doing it faster");
557                 return new DistributedShardRegistrationImpl(
558                         new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
559                         shardedDataTreeActor, this);
560             } catch (Exception e) {
561                 LOG.error("{} default shard initialization failed", logicalDatastoreType, e);
562                 throw new RuntimeException(e);
563             }
564         }
565     }
566
567     private static void closeProducer(final DOMDataTreeProducer producer) {
568         try {
569             producer.close();
570         } catch (final DOMDataTreeProducerException e) {
571             LOG.error("Unable to close producer", e);
572         }
573     }
574
575     @SuppressWarnings("checkstyle:IllegalCatch")
576     private static ActorRef createShardedDataTreeActor(final ActorSystem actorSystem,
577                                                        final ShardedDataTreeActorCreator creator,
578                                                        final String shardDataTreeActorId) {
579         Exception lastException = null;
580
581         for (int i = 0; i < MAX_ACTOR_CREATION_RETRIES; i++) {
582             try {
583                 return actorSystem.actorOf(creator.props(), shardDataTreeActorId);
584             } catch (final Exception e) {
585                 lastException = e;
586                 Uninterruptibles.sleepUninterruptibly(ACTOR_RETRY_DELAY, ACTOR_RETRY_TIME_UNIT);
587                 LOG.debug("Could not create actor {} because of {} -"
588                                 + " waiting for sometime before retrying (retry count = {})",
589                         shardDataTreeActorId, e.getMessage(), i);
590             }
591         }
592
593         throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException);
594     }
595
596     private class DistributedShardRegistrationImpl implements DistributedShardRegistration {
597
598         private final DOMDataTreeIdentifier prefix;
599         private final ActorRef shardedDataTreeActor;
600         private final DistributedShardedDOMDataTree distributedShardedDOMDataTree;
601
602         DistributedShardRegistrationImpl(final DOMDataTreeIdentifier prefix,
603                                          final ActorRef shardedDataTreeActor,
604                                          final DistributedShardedDOMDataTree distributedShardedDOMDataTree) {
605             this.prefix = prefix;
606             this.shardedDataTreeActor = shardedDataTreeActor;
607             this.distributedShardedDOMDataTree = distributedShardedDOMDataTree;
608         }
609
610         @Override
611         public CompletionStage<Void> close() {
612             // first despawn on the local node
613             distributedShardedDOMDataTree.despawnShardFrontend(prefix);
614             // update the config so the remote nodes are updated
615             final Future<Object> ask =
616                     Patterns.ask(shardedDataTreeActor, new PrefixShardRemovalLookup(prefix), SHARD_FUTURE_TIMEOUT);
617
618             final Future<Void> closeFuture = ask.transform(
619                     new Mapper<Object, Void>() {
620                         @Override
621                         public Void apply(final Object parameter) {
622                             return null;
623                         }
624                     },
625                     new Mapper<Throwable, Throwable>() {
626                         @Override
627                         public Throwable apply(final Throwable throwable) {
628                             return throwable;
629                         }
630                     }, actorSystem.dispatcher());
631
632             return FutureConverters.toJava(closeFuture);
633         }
634     }
635
636     // TODO what about producers created by this producer?
637     // They should also be CDSProducers
638     private static final class ProxyProducer extends ForwardingObject implements CDSDataTreeProducer {
639
640         private final DOMDataTreeProducer delegate;
641         private final Collection<DOMDataTreeIdentifier> subtrees;
642         private final ActorRef shardDataTreeActor;
643         private final ActorContext actorContext;
644         @GuardedBy("shardAccessMap")
645         private final Map<DOMDataTreeIdentifier, CDSShardAccessImpl> shardAccessMap = new HashMap<>();
646
647         // We don't have to guard access to shardTable in ProxyProducer.
648         // ShardTable's entries relevant to this ProxyProducer shouldn't
649         // change during producer's lifetime.
650         private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shardTable;
651
652         ProxyProducer(final DOMDataTreeProducer delegate,
653                       final Collection<DOMDataTreeIdentifier> subtrees,
654                       final ActorRef shardDataTreeActor,
655                       final ActorContext actorContext,
656                       final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shardLayout) {
657             this.delegate = Preconditions.checkNotNull(delegate);
658             this.subtrees = Preconditions.checkNotNull(subtrees);
659             this.shardDataTreeActor = Preconditions.checkNotNull(shardDataTreeActor);
660             this.actorContext = Preconditions.checkNotNull(actorContext);
661             this.shardTable = Preconditions.checkNotNull(shardLayout);
662         }
663
664         @Nonnull
665         @Override
666         public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) {
667             return delegate.createTransaction(isolated);
668         }
669
670         @Nonnull
671         @Override
672         public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
673             // TODO we probably don't need to distribute this on the remote nodes since once we have this producer
674             // open we surely have the rights to all the subtrees.
675             return delegate.createProducer(subtrees);
676         }
677
678         @Override
679         @SuppressWarnings("checkstyle:IllegalCatch")
680         public void close() throws DOMDataTreeProducerException {
681             delegate.close();
682
683             synchronized (shardAccessMap) {
684                 shardAccessMap.values().forEach(CDSShardAccessImpl::close);
685             }
686
687             final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
688             if (o instanceof DOMDataTreeProducerException) {
689                 throw (DOMDataTreeProducerException) o;
690             } else if (o instanceof Throwable) {
691                 throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o);
692             }
693         }
694
695         @Override
696         protected DOMDataTreeProducer delegate() {
697             return delegate;
698         }
699
700         @Nonnull
701         @Override
702         public CDSShardAccess getShardAccess(@Nonnull final DOMDataTreeIdentifier subtree) {
703             Preconditions.checkArgument(
704                     subtrees.stream().anyMatch(dataTreeIdentifier -> dataTreeIdentifier.contains(subtree)),
705                     "Subtree %s is not controlled by this producer %s", subtree, this);
706
707             final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
708                     shardTable.lookup(subtree);
709             Preconditions.checkState(lookup != null, "Subtree %s is not contained in any registered shard.", subtree);
710
711             final DOMDataTreeIdentifier lookupId = lookup.getValue().getPrefix();
712
713             synchronized (shardAccessMap) {
714                 if (shardAccessMap.get(lookupId) != null) {
715                     return shardAccessMap.get(lookupId);
716                 }
717
718                 // TODO Maybe we can have static factory method and return the same instance
719                 // for same subtrees. But maybe it is not needed since there can be only one
720                 // producer attached to some subtree at a time. And also how we can close ShardAccess
721                 // then
722                 final CDSShardAccessImpl shardAccess = new CDSShardAccessImpl(lookupId, actorContext);
723                 shardAccessMap.put(lookupId, shardAccess);
724                 return shardAccess;
725             }
726         }
727     }
728 }