BUG-2138: Make DistributedShardFactory return Futures.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardedDOMDataTree.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.cluster.Cluster;
18 import akka.cluster.Member;
19 import akka.dispatch.Mapper;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Throwables;
24 import com.google.common.collect.Collections2;
25 import com.google.common.collect.ForwardingObject;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.AbstractMap.SimpleEntry;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.EnumMap;
32 import java.util.Map.Entry;
33 import java.util.Set;
34 import java.util.concurrent.CompletionStage;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.TimeUnit;
37 import javax.annotation.Nonnull;
38 import org.opendaylight.controller.cluster.ActorSystemProvider;
39 import org.opendaylight.controller.cluster.access.concepts.MemberName;
40 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
41 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
42 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
43 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
44 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
45 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
46 import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
47 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
48 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
49 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
50 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
51 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
55 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
56 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
62 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
63 import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
64 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
65 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
66 import org.opendaylight.yangtools.concepts.ListenerRegistration;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import scala.collection.JavaConverters;
71 import scala.compat.java8.FutureConverters;
72 import scala.concurrent.Future;
73 import scala.concurrent.duration.FiniteDuration;
74
75 /**
76  * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
77  * {@link ShardedDataTreeActor}. Also provides QoL method for addition of prefix based clustered shard into the system.
78  */
79 public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService,
80         DistributedShardFactory {
81
82     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class);
83
84     private static final int MAX_ACTOR_CREATION_RETRIES = 100;
85     private static final int ACTOR_RETRY_DELAY = 100;
86     private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
87     static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION = new FiniteDuration(
88                     ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * 3,
89                     TimeUnit.SECONDS);
90     static final Timeout SHARD_FUTURE_TIMEOUT = new Timeout(SHARD_FUTURE_TIMEOUT_DURATION);
91
92     static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
93
94     private final ShardedDOMDataTree shardedDOMDataTree;
95     private final ActorSystem actorSystem;
96     private final DistributedDataStore distributedOperDatastore;
97     private final DistributedDataStore distributedConfigDatastore;
98
99     private final ActorRef shardedDataTreeActor;
100     private final MemberName memberName;
101
102     private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shards =
103             DOMDataTreePrefixTable.create();
104
105     private final EnumMap<LogicalDatastoreType, DistributedShardRegistration> defaultShardRegistrations =
106             new EnumMap<>(LogicalDatastoreType.class);
107
108     public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider,
109                                          final DistributedDataStore distributedOperDatastore,
110                                          final DistributedDataStore distributedConfigDatastore) {
111         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
112         this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore);
113         this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore);
114         shardedDOMDataTree = new ShardedDOMDataTree();
115
116         shardedDataTreeActor = createShardedDataTreeActor(actorSystem,
117                 new ShardedDataTreeActorCreator()
118                         .setShardingService(this)
119                         .setActorSystem(actorSystem)
120                         .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper())
121                         .setDistributedConfigDatastore(distributedConfigDatastore)
122                         .setDistributedOperDatastore(distributedOperDatastore),
123                 ACTOR_ID);
124
125         this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName();
126
127         //create shard registration for DEFAULT_SHARD
128         try {
129             defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
130                     initDefaultShard(LogicalDatastoreType.CONFIGURATION));
131         } catch (final InterruptedException | ExecutionException e) {
132             LOG.error("Unable to create default shard frontend for config shard", e);
133         }
134
135         try {
136             defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
137                     initDefaultShard(LogicalDatastoreType.OPERATIONAL));
138         } catch (final InterruptedException | ExecutionException e) {
139             LOG.error("Unable to create default shard frontend for operational shard", e);
140         }
141     }
142
143     @Nonnull
144     @Override
145     public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(
146             final T listener, final Collection<DOMDataTreeIdentifier> subtrees,
147             final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers)
148             throws DOMDataTreeLoopException {
149         return shardedDOMDataTree.registerListener(listener, subtrees, allowRxMerges, producers);
150     }
151
152     @Nonnull
153     @Override
154     public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
155         LOG.debug("{} - Creating producer for {}",
156                 distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
157         final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees);
158
159         final Object response = distributedConfigDatastore.getActorContext()
160                 .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees));
161         if (response == null) {
162             LOG.debug("{} - Received success from remote nodes, creating producer:{}",
163                     distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
164             return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
165                     distributedConfigDatastore.getActorContext());
166         } else if (response instanceof Exception) {
167             closeProducer(producer);
168             throw Throwables.propagate((Exception) response);
169         } else {
170             closeProducer(producer);
171             throw new RuntimeException("Unexpected response to create producer received." + response);
172         }
173     }
174
175     @Override
176     public CompletionStage<DistributedShardRegistration> createDistributedShard(
177             final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
178             throws DOMDataTreeShardingConflictException {
179         final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
180                 shards.lookup(prefix);
181         if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
182             throw new DOMDataTreeShardingConflictException(
183                     "Prefix " + prefix + " is already occupied by another shard.");
184         }
185
186         final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
187
188         final Future<Object> ask =
189                 Patterns.ask(shardedDataTreeActor, new CreatePrefixShard(config), SHARD_FUTURE_TIMEOUT);
190
191         final Future<DistributedShardRegistration> shardRegistrationFuture = ask.transform(
192                 new Mapper<Object, DistributedShardRegistration>() {
193                     @Override
194                     public DistributedShardRegistration apply(final Object parameter) {
195                         return new DistributedShardRegistrationImpl(
196                                 prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this);
197                     }
198                 },
199                 new Mapper<Throwable, Throwable>() {
200                     @Override
201                     public Throwable apply(final Throwable throwable) {
202                         return new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable);
203                     }
204                 }, actorSystem.dispatcher());
205
206         return FutureConverters.toJava(shardRegistrationFuture);
207     }
208
209     void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
210         LOG.debug("Member {}: Resolving additions : {}", memberName, additions);
211         final ArrayList<DOMDataTreeIdentifier> list = new ArrayList<>(additions);
212         // we need to register the shards from top to bottom, so we need to atleast make sure the ordering reflects that
213         Collections.sort(list, (o1, o2) -> {
214             if (o1.getRootIdentifier().getPathArguments().size() < o2.getRootIdentifier().getPathArguments().size()) {
215                 return -1;
216             } else if (o1.getRootIdentifier().getPathArguments().size()
217                     == o2.getRootIdentifier().getPathArguments().size()) {
218                 return 0;
219             } else {
220                 return 1;
221             }
222         });
223         list.forEach(this::createShardFrontend);
224     }
225
226     void resolveShardRemovals(final Set<DOMDataTreeIdentifier> removals) {
227         LOG.debug("Member {}: Resolving removals : {}", memberName, removals);
228
229         // do we need to go from bottom to top?
230         removals.forEach(this::despawnShardFrontend);
231     }
232
233     private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
234         LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix);
235         final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
236         final DistributedDataStore distributedDataStore =
237                 prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
238                         ? distributedConfigDatastore : distributedOperDatastore;
239
240         try (final DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
241             final Entry<DataStoreClient, ActorRef> entry =
242                     createDatastoreClient(shardName, distributedDataStore.getActorContext());
243
244             final DistributedShardFrontend shard =
245                     new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix);
246
247             @SuppressWarnings("unchecked")
248             final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
249                     (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
250             shards.store(prefix, reg);
251         } catch (final DOMDataTreeShardingConflictException e) {
252             LOG.error("Prefix {} is already occupied by another shard", prefix, e);
253         } catch (DOMDataTreeProducerException e) {
254             LOG.error("Unable to close producer", e);
255         } catch (DOMDataTreeShardCreationFailedException e) {
256             LOG.error("Unable to create datastore client for shard {}", prefix, e);
257         }
258     }
259
260     private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) {
261         LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix);
262         final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
263                 shards.lookup(prefix);
264
265         if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) {
266             LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
267                     memberName, prefix);
268             return;
269         }
270
271         lookup.getValue().close();
272         // need to remove from our local table thats used for tracking
273         shards.remove(prefix);
274     }
275
276     DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
277             final DOMDataTreeIdentifier prefix) {
278         return shards.lookup(prefix);
279
280     }
281
282     DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
283         return shardedDOMDataTree.createProducer(prefix);
284     }
285
286     @Nonnull
287     @Override
288     public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
289             @Nonnull final DOMDataTreeIdentifier prefix,
290             @Nonnull final T shard,
291             @Nonnull final DOMDataTreeProducer producer)
292             throws DOMDataTreeShardingConflictException {
293
294         LOG.debug("Registering shard[{}] at prefix: {}", shard, prefix);
295
296         return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
297     }
298
299     @SuppressWarnings("checkstyle:IllegalCatch")
300     private Entry<DataStoreClient, ActorRef> createDatastoreClient(
301             final String shardName, final ActorContext actorContext)
302             throws DOMDataTreeShardCreationFailedException {
303
304         LOG.debug("Creating distributed datastore client for shard {}", shardName);
305         final Props distributedDataStoreClientProps =
306                 SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorContext, shardName);
307
308         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
309         try {
310             return new SimpleEntry<>(SimpleDataStoreClientActor
311                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor);
312         } catch (final Exception e) {
313             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
314             clientActor.tell(PoisonPill.getInstance(), noSender());
315             throw new DOMDataTreeShardCreationFailedException(
316                     "Unable to create datastore client for shard{" + shardName + "}", e);
317         }
318     }
319
320     private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
321             throws ExecutionException, InterruptedException {
322         final Collection<Member> members = JavaConverters.asJavaCollectionConverter(
323                 Cluster.get(actorSystem).state().members()).asJavaCollection();
324         final Collection<MemberName> names = Collections2.transform(members,
325             m -> MemberName.forName(m.roles().iterator().next()));
326
327         try {
328             // we should probably only have one node create the default shards
329             return createDistributedShard(
330                     new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)
331                     .toCompletableFuture().get();
332         } catch (DOMDataTreeShardingConflictException e) {
333             LOG.debug("Default shard already registered, possibly due to other node doing it faster");
334             return new DistributedShardRegistrationImpl(
335                     new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
336                     shardedDataTreeActor, this);
337         }
338     }
339
340     private static void closeProducer(final DOMDataTreeProducer producer) {
341         try {
342             producer.close();
343         } catch (final DOMDataTreeProducerException e) {
344             LOG.error("Unable to close producer", e);
345         }
346     }
347
348     @SuppressWarnings("checkstyle:IllegalCatch")
349     private static ActorRef createShardedDataTreeActor(final ActorSystem actorSystem,
350                                                        final ShardedDataTreeActorCreator creator,
351                                                        final String shardDataTreeActorId) {
352         Exception lastException = null;
353
354         for (int i = 0; i < MAX_ACTOR_CREATION_RETRIES; i++) {
355             try {
356                 return actorSystem.actorOf(creator.props(), shardDataTreeActorId);
357             } catch (final Exception e) {
358                 lastException = e;
359                 Uninterruptibles.sleepUninterruptibly(ACTOR_RETRY_DELAY, ACTOR_RETRY_TIME_UNIT);
360                 LOG.debug("Could not create actor {} because of {} -"
361                                 + " waiting for sometime before retrying (retry count = {})",
362                         shardDataTreeActorId, e.getMessage(), i);
363             }
364         }
365
366         throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException);
367     }
368
369     private class DistributedShardRegistrationImpl implements DistributedShardRegistration {
370
371         private final DOMDataTreeIdentifier prefix;
372         private final ActorRef shardedDataTreeActor;
373         private final DistributedShardedDOMDataTree distributedShardedDOMDataTree;
374
375         DistributedShardRegistrationImpl(final DOMDataTreeIdentifier prefix,
376                                          final ActorRef shardedDataTreeActor,
377                                          final DistributedShardedDOMDataTree distributedShardedDOMDataTree) {
378             this.prefix = prefix;
379             this.shardedDataTreeActor = shardedDataTreeActor;
380             this.distributedShardedDOMDataTree = distributedShardedDOMDataTree;
381         }
382
383         @Override
384         public CompletionStage<Void> close() {
385             // first despawn on the local node
386             distributedShardedDOMDataTree.despawnShardFrontend(prefix);
387             // update the config so the remote nodes are updated
388             final Future<Object> ask =
389                     Patterns.ask(shardedDataTreeActor, new RemovePrefixShard(prefix), SHARD_FUTURE_TIMEOUT);
390
391             final Future<Void> closeFuture = ask.transform(
392                     new Mapper<Object, Void>() {
393                         @Override
394                         public Void apply(Object parameter) {
395                             return null;
396                         }
397                     },
398                     new Mapper<Throwable, Throwable>() {
399                         @Override
400                         public Throwable apply(Throwable throwable) {
401                             return throwable;
402                         }
403                     }, actorSystem.dispatcher());
404
405             return FutureConverters.toJava(closeFuture);
406         }
407     }
408
409     private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer {
410
411         private final DOMDataTreeProducer delegate;
412         private final Collection<DOMDataTreeIdentifier> subtrees;
413         private final ActorRef shardDataTreeActor;
414         private final ActorContext actorContext;
415
416         ProxyProducer(final DOMDataTreeProducer delegate,
417                       final Collection<DOMDataTreeIdentifier> subtrees,
418                       final ActorRef shardDataTreeActor,
419                       final ActorContext actorContext) {
420             this.delegate = Preconditions.checkNotNull(delegate);
421             this.subtrees = Preconditions.checkNotNull(subtrees);
422             this.shardDataTreeActor = Preconditions.checkNotNull(shardDataTreeActor);
423             this.actorContext = Preconditions.checkNotNull(actorContext);
424         }
425
426         @Nonnull
427         @Override
428         public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) {
429             return delegate.createTransaction(isolated);
430         }
431
432         @Nonnull
433         @Override
434         public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
435             // TODO we probably don't need to distribute this on the remote nodes since once we have this producer
436             // open we surely have the rights to all the subtrees.
437             return delegate.createProducer(subtrees);
438         }
439
440         @Override
441         public void close() throws DOMDataTreeProducerException {
442             delegate.close();
443
444             final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
445             if (o instanceof DOMDataTreeProducerException) {
446                 throw ((DOMDataTreeProducerException) o);
447             } else if (o instanceof Throwable) {
448                 throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o);
449             }
450         }
451
452         @Override
453         protected DOMDataTreeProducer delegate() {
454             return delegate;
455         }
456     }
457 }