BUG-2138: Fix shard registration with ProxyProducers.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardedDOMDataTree.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.cluster.Cluster;
18 import akka.cluster.Member;
19 import akka.dispatch.Mapper;
20 import akka.pattern.Patterns;
21 import akka.util.Timeout;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Throwables;
24 import com.google.common.collect.Collections2;
25 import com.google.common.collect.ForwardingObject;
26 import com.google.common.util.concurrent.Uninterruptibles;
27 import java.util.AbstractMap.SimpleEntry;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Collections;
31 import java.util.EnumMap;
32 import java.util.Map.Entry;
33 import java.util.Set;
34 import java.util.concurrent.CompletionStage;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.TimeUnit;
37 import javax.annotation.Nonnull;
38 import org.opendaylight.controller.cluster.ActorSystemProvider;
39 import org.opendaylight.controller.cluster.access.concepts.MemberName;
40 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
41 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
42 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
43 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
44 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
45 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
46 import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
47 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
48 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
49 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
50 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
51 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
55 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
56 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
58 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
59 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
60 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
61 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
62 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
63 import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
64 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
65 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
66 import org.opendaylight.yangtools.concepts.ListenerRegistration;
67 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
68 import org.slf4j.Logger;
69 import org.slf4j.LoggerFactory;
70 import scala.collection.JavaConverters;
71 import scala.compat.java8.FutureConverters;
72 import scala.concurrent.Future;
73 import scala.concurrent.duration.FiniteDuration;
74
75 /**
76  * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
77  * {@link ShardedDataTreeActor}. Also provides QoL method for addition of prefix based clustered shard into the system.
78  */
79 public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService,
80         DistributedShardFactory {
81
82     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class);
83
84     private static final int MAX_ACTOR_CREATION_RETRIES = 100;
85     private static final int ACTOR_RETRY_DELAY = 100;
86     private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
87     static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION = new FiniteDuration(
88                     ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * ShardedDataTreeActor.LOOKUP_TASK_MAX_RETRIES * 3,
89                     TimeUnit.SECONDS);
90     static final Timeout SHARD_FUTURE_TIMEOUT = new Timeout(SHARD_FUTURE_TIMEOUT_DURATION);
91
92     static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
93
94     private final ShardedDOMDataTree shardedDOMDataTree;
95     private final ActorSystem actorSystem;
96     private final DistributedDataStore distributedOperDatastore;
97     private final DistributedDataStore distributedConfigDatastore;
98
99     private final ActorRef shardedDataTreeActor;
100     private final MemberName memberName;
101
102     private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shards =
103             DOMDataTreePrefixTable.create();
104
105     private final EnumMap<LogicalDatastoreType, DistributedShardRegistration> defaultShardRegistrations =
106             new EnumMap<>(LogicalDatastoreType.class);
107
108     public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider,
109                                          final DistributedDataStore distributedOperDatastore,
110                                          final DistributedDataStore distributedConfigDatastore) {
111         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
112         this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore);
113         this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore);
114         shardedDOMDataTree = new ShardedDOMDataTree();
115
116         shardedDataTreeActor = createShardedDataTreeActor(actorSystem,
117                 new ShardedDataTreeActorCreator()
118                         .setShardingService(this)
119                         .setActorSystem(actorSystem)
120                         .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper())
121                         .setDistributedConfigDatastore(distributedConfigDatastore)
122                         .setDistributedOperDatastore(distributedOperDatastore),
123                 ACTOR_ID);
124
125         this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName();
126
127         //create shard registration for DEFAULT_SHARD
128         try {
129             defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
130                     initDefaultShard(LogicalDatastoreType.CONFIGURATION));
131         } catch (final InterruptedException | ExecutionException e) {
132             LOG.error("Unable to create default shard frontend for config shard", e);
133         }
134
135         try {
136             defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
137                     initDefaultShard(LogicalDatastoreType.OPERATIONAL));
138         } catch (final InterruptedException | ExecutionException e) {
139             LOG.error("Unable to create default shard frontend for operational shard", e);
140         }
141     }
142
143     @Nonnull
144     @Override
145     public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(
146             final T listener, final Collection<DOMDataTreeIdentifier> subtrees,
147             final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers)
148             throws DOMDataTreeLoopException {
149         return shardedDOMDataTree.registerListener(listener, subtrees, allowRxMerges, producers);
150     }
151
152     @Nonnull
153     @Override
154     public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
155         LOG.debug("{} - Creating producer for {}",
156                 distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
157         final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees);
158
159         final Object response = distributedConfigDatastore.getActorContext()
160                 .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees));
161         if (response == null) {
162             LOG.debug("{} - Received success from remote nodes, creating producer:{}",
163                     distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
164             return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
165                     distributedConfigDatastore.getActorContext());
166         } else if (response instanceof Exception) {
167             closeProducer(producer);
168             throw Throwables.propagate((Exception) response);
169         } else {
170             closeProducer(producer);
171             throw new RuntimeException("Unexpected response to create producer received." + response);
172         }
173     }
174
175     @Override
176     public CompletionStage<DistributedShardRegistration> createDistributedShard(
177             final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
178             throws DOMDataTreeShardingConflictException {
179         final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
180                 shards.lookup(prefix);
181         if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
182             throw new DOMDataTreeShardingConflictException(
183                     "Prefix " + prefix + " is already occupied by another shard.");
184         }
185
186         final PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
187
188         final Future<Object> ask =
189                 Patterns.ask(shardedDataTreeActor, new CreatePrefixShard(config), SHARD_FUTURE_TIMEOUT);
190
191         final Future<DistributedShardRegistration> shardRegistrationFuture = ask.transform(
192                 new Mapper<Object, DistributedShardRegistration>() {
193                     @Override
194                     public DistributedShardRegistration apply(final Object parameter) {
195                         return new DistributedShardRegistrationImpl(
196                                 prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this);
197                     }
198                 },
199                 new Mapper<Throwable, Throwable>() {
200                     @Override
201                     public Throwable apply(final Throwable throwable) {
202                         return new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable);
203                     }
204                 }, actorSystem.dispatcher());
205
206         return FutureConverters.toJava(shardRegistrationFuture);
207     }
208
209     void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
210         LOG.debug("Member {}: Resolving additions : {}", memberName, additions);
211         final ArrayList<DOMDataTreeIdentifier> list = new ArrayList<>(additions);
212         // we need to register the shards from top to bottom, so we need to atleast make sure the ordering reflects that
213         Collections.sort(list, (o1, o2) -> {
214             if (o1.getRootIdentifier().getPathArguments().size() < o2.getRootIdentifier().getPathArguments().size()) {
215                 return -1;
216             } else if (o1.getRootIdentifier().getPathArguments().size()
217                     == o2.getRootIdentifier().getPathArguments().size()) {
218                 return 0;
219             } else {
220                 return 1;
221             }
222         });
223         list.forEach(this::createShardFrontend);
224     }
225
226     void resolveShardRemovals(final Set<DOMDataTreeIdentifier> removals) {
227         LOG.debug("Member {}: Resolving removals : {}", memberName, removals);
228
229         // do we need to go from bottom to top?
230         removals.forEach(this::despawnShardFrontend);
231     }
232
233     private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
234         LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix);
235         final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
236         final DistributedDataStore distributedDataStore =
237                 prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
238                         ? distributedConfigDatastore : distributedOperDatastore;
239
240         try (final DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
241             final Entry<DataStoreClient, ActorRef> entry =
242                     createDatastoreClient(shardName, distributedDataStore.getActorContext());
243
244             final DistributedShardFrontend shard =
245                     new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix);
246
247             @SuppressWarnings("unchecked")
248             final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
249                     (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
250             shards.store(prefix, reg);
251         } catch (final DOMDataTreeShardingConflictException e) {
252             LOG.error("Prefix {} is already occupied by another shard", prefix, e);
253         } catch (DOMDataTreeProducerException e) {
254             LOG.error("Unable to close producer", e);
255         } catch (DOMDataTreeShardCreationFailedException e) {
256             LOG.error("Unable to create datastore client for shard {}", prefix, e);
257         }
258     }
259
260     private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) {
261         LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix);
262         final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
263                 shards.lookup(prefix);
264
265         if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) {
266             LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
267                     memberName, prefix);
268             return;
269         }
270
271         lookup.getValue().close();
272         // need to remove from our local table thats used for tracking
273         shards.remove(prefix);
274     }
275
276     DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
277             final DOMDataTreeIdentifier prefix) {
278         return shards.lookup(prefix);
279
280     }
281
282     DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
283         return shardedDOMDataTree.createProducer(prefix);
284     }
285
286     @Nonnull
287     @Override
288     public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
289             @Nonnull final DOMDataTreeIdentifier prefix,
290             @Nonnull final T shard,
291             @Nonnull final DOMDataTreeProducer producer)
292             throws DOMDataTreeShardingConflictException {
293
294         LOG.debug("Registering shard[{}] at prefix: {}", shard, prefix);
295
296         if (producer instanceof ProxyProducer) {
297             return shardedDOMDataTree.registerDataTreeShard(prefix, shard, ((ProxyProducer) producer).delegate());
298         }
299
300         return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
301     }
302
303     @SuppressWarnings("checkstyle:IllegalCatch")
304     private Entry<DataStoreClient, ActorRef> createDatastoreClient(
305             final String shardName, final ActorContext actorContext)
306             throws DOMDataTreeShardCreationFailedException {
307
308         LOG.debug("Creating distributed datastore client for shard {}", shardName);
309         final Props distributedDataStoreClientProps =
310                 SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorContext, shardName);
311
312         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
313         try {
314             return new SimpleEntry<>(SimpleDataStoreClientActor
315                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor);
316         } catch (final Exception e) {
317             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
318             clientActor.tell(PoisonPill.getInstance(), noSender());
319             throw new DOMDataTreeShardCreationFailedException(
320                     "Unable to create datastore client for shard{" + shardName + "}", e);
321         }
322     }
323
324     private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
325             throws ExecutionException, InterruptedException {
326         final Collection<Member> members = JavaConverters.asJavaCollectionConverter(
327                 Cluster.get(actorSystem).state().members()).asJavaCollection();
328         final Collection<MemberName> names = Collections2.transform(members,
329             m -> MemberName.forName(m.roles().iterator().next()));
330
331         try {
332             // we should probably only have one node create the default shards
333             return createDistributedShard(
334                     new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names)
335                     .toCompletableFuture().get();
336         } catch (DOMDataTreeShardingConflictException e) {
337             LOG.debug("Default shard already registered, possibly due to other node doing it faster");
338             return new DistributedShardRegistrationImpl(
339                     new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY),
340                     shardedDataTreeActor, this);
341         }
342     }
343
344     private static void closeProducer(final DOMDataTreeProducer producer) {
345         try {
346             producer.close();
347         } catch (final DOMDataTreeProducerException e) {
348             LOG.error("Unable to close producer", e);
349         }
350     }
351
352     @SuppressWarnings("checkstyle:IllegalCatch")
353     private static ActorRef createShardedDataTreeActor(final ActorSystem actorSystem,
354                                                        final ShardedDataTreeActorCreator creator,
355                                                        final String shardDataTreeActorId) {
356         Exception lastException = null;
357
358         for (int i = 0; i < MAX_ACTOR_CREATION_RETRIES; i++) {
359             try {
360                 return actorSystem.actorOf(creator.props(), shardDataTreeActorId);
361             } catch (final Exception e) {
362                 lastException = e;
363                 Uninterruptibles.sleepUninterruptibly(ACTOR_RETRY_DELAY, ACTOR_RETRY_TIME_UNIT);
364                 LOG.debug("Could not create actor {} because of {} -"
365                                 + " waiting for sometime before retrying (retry count = {})",
366                         shardDataTreeActorId, e.getMessage(), i);
367             }
368         }
369
370         throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException);
371     }
372
373     private class DistributedShardRegistrationImpl implements DistributedShardRegistration {
374
375         private final DOMDataTreeIdentifier prefix;
376         private final ActorRef shardedDataTreeActor;
377         private final DistributedShardedDOMDataTree distributedShardedDOMDataTree;
378
379         DistributedShardRegistrationImpl(final DOMDataTreeIdentifier prefix,
380                                          final ActorRef shardedDataTreeActor,
381                                          final DistributedShardedDOMDataTree distributedShardedDOMDataTree) {
382             this.prefix = prefix;
383             this.shardedDataTreeActor = shardedDataTreeActor;
384             this.distributedShardedDOMDataTree = distributedShardedDOMDataTree;
385         }
386
387         @Override
388         public CompletionStage<Void> close() {
389             // first despawn on the local node
390             distributedShardedDOMDataTree.despawnShardFrontend(prefix);
391             // update the config so the remote nodes are updated
392             final Future<Object> ask =
393                     Patterns.ask(shardedDataTreeActor, new RemovePrefixShard(prefix), SHARD_FUTURE_TIMEOUT);
394
395             final Future<Void> closeFuture = ask.transform(
396                     new Mapper<Object, Void>() {
397                         @Override
398                         public Void apply(Object parameter) {
399                             return null;
400                         }
401                     },
402                     new Mapper<Throwable, Throwable>() {
403                         @Override
404                         public Throwable apply(Throwable throwable) {
405                             return throwable;
406                         }
407                     }, actorSystem.dispatcher());
408
409             return FutureConverters.toJava(closeFuture);
410         }
411     }
412
413     private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer {
414
415         private final DOMDataTreeProducer delegate;
416         private final Collection<DOMDataTreeIdentifier> subtrees;
417         private final ActorRef shardDataTreeActor;
418         private final ActorContext actorContext;
419
420         ProxyProducer(final DOMDataTreeProducer delegate,
421                       final Collection<DOMDataTreeIdentifier> subtrees,
422                       final ActorRef shardDataTreeActor,
423                       final ActorContext actorContext) {
424             this.delegate = Preconditions.checkNotNull(delegate);
425             this.subtrees = Preconditions.checkNotNull(subtrees);
426             this.shardDataTreeActor = Preconditions.checkNotNull(shardDataTreeActor);
427             this.actorContext = Preconditions.checkNotNull(actorContext);
428         }
429
430         @Nonnull
431         @Override
432         public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) {
433             return delegate.createTransaction(isolated);
434         }
435
436         @Nonnull
437         @Override
438         public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
439             // TODO we probably don't need to distribute this on the remote nodes since once we have this producer
440             // open we surely have the rights to all the subtrees.
441             return delegate.createProducer(subtrees);
442         }
443
444         @Override
445         public void close() throws DOMDataTreeProducerException {
446             delegate.close();
447
448             final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
449             if (o instanceof DOMDataTreeProducerException) {
450                 throw ((DOMDataTreeProducerException) o);
451             } else if (o instanceof Throwable) {
452                 throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o);
453             }
454         }
455
456         @Override
457         protected DOMDataTreeProducer delegate() {
458             return delegate;
459         }
460     }
461 }