BUG-2138: Listener support in shard frontend
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / DistributedShardedDOMDataTree.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import static akka.actor.ActorRef.noSender;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import akka.cluster.Cluster;
18 import akka.cluster.Member;
19 import akka.util.Timeout;
20 import com.google.common.base.Preconditions;
21 import com.google.common.base.Throwables;
22 import com.google.common.collect.Collections2;
23 import com.google.common.collect.ForwardingObject;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import java.util.AbstractMap.SimpleEntry;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Collections;
29 import java.util.EnumMap;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.concurrent.TimeUnit;
33 import javax.annotation.Nonnull;
34 import org.opendaylight.controller.cluster.ActorSystemProvider;
35 import org.opendaylight.controller.cluster.access.concepts.MemberName;
36 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
37 import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
38 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
39 import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
40 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
41 import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
42 import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
43 import org.opendaylight.controller.cluster.sharding.messages.CreatePrefixShard;
44 import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
45 import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
46 import org.opendaylight.controller.cluster.sharding.messages.RemovePrefixShard;
47 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
48 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
49 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
50 import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
51 import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
52 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
53 import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
54 import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
55 import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
56 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
57 import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
58 import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
59 import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
60 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
61 import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
62 import org.opendaylight.yangtools.concepts.ListenerRegistration;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66 import scala.collection.JavaConverters;
67
68 /**
69  * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
70  * {@link ShardedDataTreeActor}. Also provides QoL method for addition of prefix based clustered shard into the system.
71  */
72 public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService,
73         DistributedShardFactory {
74
75     private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class);
76
77     private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
78     private static final int MAX_ACTOR_CREATION_RETRIES = 100;
79     private static final int ACTOR_RETRY_DELAY = 100;
80     private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
81
82     static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
83
84     private final ShardedDOMDataTree shardedDOMDataTree;
85     private final ActorSystem actorSystem;
86     private final DistributedDataStore distributedOperDatastore;
87     private final DistributedDataStore distributedConfigDatastore;
88
89     private final ActorRef shardedDataTreeActor;
90     private final MemberName memberName;
91
92     private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shards =
93             DOMDataTreePrefixTable.create();
94
95     private final EnumMap<LogicalDatastoreType, DistributedShardRegistration> defaultShardRegistrations =
96             new EnumMap<>(LogicalDatastoreType.class);
97
98     public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider,
99                                          final DistributedDataStore distributedOperDatastore,
100                                          final DistributedDataStore distributedConfigDatastore) {
101         this.actorSystem = Preconditions.checkNotNull(actorSystemProvider).getActorSystem();
102         this.distributedOperDatastore = Preconditions.checkNotNull(distributedOperDatastore);
103         this.distributedConfigDatastore = Preconditions.checkNotNull(distributedConfigDatastore);
104         shardedDOMDataTree = new ShardedDOMDataTree();
105
106         shardedDataTreeActor = createShardedDataTreeActor(actorSystem,
107                 new ShardedDataTreeActorCreator()
108                         .setShardingService(this)
109                         .setActorSystem(actorSystem)
110                         .setClusterWrapper(distributedConfigDatastore.getActorContext().getClusterWrapper())
111                         .setDistributedConfigDatastore(distributedConfigDatastore)
112                         .setDistributedOperDatastore(distributedOperDatastore),
113                 ACTOR_ID);
114
115         this.memberName = distributedConfigDatastore.getActorContext().getCurrentMemberName();
116
117         //create shard registration for DEFAULT_SHARD
118         try {
119             defaultShardRegistrations.put(LogicalDatastoreType.CONFIGURATION,
120                     initDefaultShard(LogicalDatastoreType.CONFIGURATION));
121         } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) {
122             LOG.error("Unable to create default shard frontend for config shard", e);
123         }
124
125         try {
126             defaultShardRegistrations.put(LogicalDatastoreType.OPERATIONAL,
127                     initDefaultShard(LogicalDatastoreType.OPERATIONAL));
128         } catch (final DOMDataTreeProducerException | DOMDataTreeShardingConflictException e) {
129             LOG.error("Unable to create default shard frontend for operational shard", e);
130         }
131     }
132
133     @Nonnull
134     @Override
135     public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(
136             final T listener, final Collection<DOMDataTreeIdentifier> subtrees,
137             final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers)
138             throws DOMDataTreeLoopException {
139         return shardedDOMDataTree.registerListener(listener, subtrees, allowRxMerges, producers);
140     }
141
142     @Nonnull
143     @Override
144     public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
145         LOG.debug("{} - Creating producer for {}",
146                 distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
147         final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees);
148
149         final Object response = distributedConfigDatastore.getActorContext()
150                 .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees));
151         if (response == null) {
152             LOG.debug("{} - Received success from remote nodes, creating producer:{}",
153                     distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
154             return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
155                     distributedConfigDatastore.getActorContext());
156         } else if (response instanceof Exception) {
157             closeProducer(producer);
158             throw Throwables.propagate((Exception) response);
159         } else {
160             closeProducer(producer);
161             throw new RuntimeException("Unexpected response to create producer received." + response);
162         }
163     }
164
165     @Override
166     @SuppressWarnings("checkstyle:IllegalCatch")
167     //TODO: it would be better to block here until the message is processed by the actor
168     public DistributedShardRegistration createDistributedShard(
169             final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
170             throws DOMDataTreeShardingConflictException, DOMDataTreeProducerException {
171         final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
172                 shards.lookup(prefix);
173         if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
174             throw new DOMDataTreeShardingConflictException(
175                     "Prefix " + prefix + " is already occupied by another shard.");
176         }
177
178         PrefixShardConfiguration config = new PrefixShardConfiguration(prefix, "prefix", replicaMembers);
179         shardedDataTreeActor.tell(new CreatePrefixShard(config), noSender());
180
181         return new DistributedShardRegistrationImpl(prefix, shardedDataTreeActor, this);
182     }
183
184     void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
185         LOG.debug("Member {}: Resolving additions : {}", memberName, additions);
186         final ArrayList<DOMDataTreeIdentifier> list = new ArrayList<>(additions);
187         // we need to register the shards from top to bottom, so we need to atleast make sure the ordering reflects that
188         Collections.sort(list, (o1, o2) -> {
189             if (o1.getRootIdentifier().getPathArguments().size() < o2.getRootIdentifier().getPathArguments().size()) {
190                 return -1;
191             } else if (o1.getRootIdentifier().getPathArguments().size()
192                     == o2.getRootIdentifier().getPathArguments().size()) {
193                 return 0;
194             } else {
195                 return 1;
196             }
197         });
198         list.forEach(this::createShardFrontend);
199     }
200
201     void resolveShardRemovals(final Set<DOMDataTreeIdentifier> removals) {
202         LOG.debug("Member {}: Resolving removals : {}", memberName, removals);
203
204         // do we need to go from bottom to top?
205         removals.forEach(this::despawnShardFrontend);
206     }
207
208     private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
209         LOG.debug("Member {}: Creating CDS shard for prefix: {}", memberName, prefix);
210         final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
211         final DistributedDataStore distributedDataStore =
212                 prefix.getDatastoreType().equals(org.opendaylight.mdsal.common.api.LogicalDatastoreType.CONFIGURATION)
213                         ? distributedConfigDatastore : distributedOperDatastore;
214
215         try (final DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
216             final Entry<DataStoreClient, ActorRef> entry =
217                     createDatastoreClient(shardName, distributedDataStore.getActorContext());
218
219             final DistributedShardFrontend shard =
220                     new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix);
221
222             @SuppressWarnings("unchecked")
223             final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
224                     (DOMDataTreeShardRegistration) shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
225             shards.store(prefix, reg);
226         } catch (final DOMDataTreeShardingConflictException e) {
227             LOG.error("Prefix {} is already occupied by another shard", prefix, e);
228         } catch (DOMDataTreeProducerException e) {
229             LOG.error("Unable to close producer", e);
230         } catch (DOMDataTreeShardCreationFailedException e) {
231             LOG.error("Unable to create datastore client for shard {}", prefix, e);
232         }
233     }
234
235     private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) {
236         LOG.debug("Member {}: Removing CDS shard for prefix: {}", memberName, prefix);
237         final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
238                 shards.lookup(prefix);
239
240         if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) {
241             LOG.debug("Member {}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
242                     memberName, prefix);
243             return;
244         }
245
246         lookup.getValue().close();
247         // need to remove from our local table thats used for tracking
248         shards.remove(prefix);
249     }
250
251     DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
252         return shardedDOMDataTree.createProducer(prefix);
253     }
254
255     @Nonnull
256     @Override
257     public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
258             @Nonnull final DOMDataTreeIdentifier prefix,
259             @Nonnull final T shard,
260             @Nonnull final DOMDataTreeProducer producer)
261             throws DOMDataTreeShardingConflictException {
262
263         LOG.debug("Registering shard[{}] at prefix: {}", shard, prefix);
264
265         return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
266     }
267
268     @SuppressWarnings("checkstyle:IllegalCatch")
269     private Entry<DataStoreClient, ActorRef> createDatastoreClient(
270             final String shardName, final ActorContext actorContext)
271             throws DOMDataTreeShardCreationFailedException {
272
273         LOG.debug("Creating distributed datastore client for shard {}", shardName);
274         final Props distributedDataStoreClientProps =
275                 SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorContext, shardName);
276
277         final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
278         try {
279             return new SimpleEntry<>(SimpleDataStoreClientActor
280                     .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor);
281         } catch (final Exception e) {
282             LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
283             clientActor.tell(PoisonPill.getInstance(), noSender());
284             throw new DOMDataTreeShardCreationFailedException(
285                     "Unable to create datastore client for shard{" + shardName + "}", e);
286         }
287     }
288
289     private DistributedShardRegistration initDefaultShard(final LogicalDatastoreType logicalDatastoreType)
290             throws DOMDataTreeProducerException, DOMDataTreeShardingConflictException {
291         final Collection<Member> members = JavaConverters.asJavaCollectionConverter(
292                 Cluster.get(actorSystem).state().members()).asJavaCollection();
293         final Collection<MemberName> names = Collections2.transform(members,
294             m -> MemberName.forName(m.roles().iterator().next()));
295
296         return createDistributedShard(
297                 new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.EMPTY), names);
298     }
299
300     private static void closeProducer(final DOMDataTreeProducer producer) {
301         try {
302             producer.close();
303         } catch (final DOMDataTreeProducerException e) {
304             LOG.error("Unable to close producer", e);
305         }
306     }
307
308     @SuppressWarnings("checkstyle:IllegalCatch")
309     private static ActorRef createShardedDataTreeActor(final ActorSystem actorSystem,
310                                                        final ShardedDataTreeActorCreator creator,
311                                                        final String shardDataTreeActorId) {
312         Exception lastException = null;
313
314         for (int i = 0; i < MAX_ACTOR_CREATION_RETRIES; i++) {
315             try {
316                 return actorSystem.actorOf(creator.props(), shardDataTreeActorId);
317             } catch (final Exception e) {
318                 lastException = e;
319                 Uninterruptibles.sleepUninterruptibly(ACTOR_RETRY_DELAY, ACTOR_RETRY_TIME_UNIT);
320                 LOG.debug("Could not create actor {} because of {} -"
321                                 + " waiting for sometime before retrying (retry count = {})",
322                         shardDataTreeActorId, e.getMessage(), i);
323             }
324         }
325
326         throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException);
327     }
328
329     private static class DistributedShardRegistrationImpl implements DistributedShardRegistration {
330
331         private final DOMDataTreeIdentifier prefix;
332         private final ActorRef shardedDataTreeActor;
333         private final DistributedShardedDOMDataTree distributedShardedDOMDataTree;
334
335         DistributedShardRegistrationImpl(final DOMDataTreeIdentifier prefix,
336                                          final ActorRef shardedDataTreeActor,
337                                          final DistributedShardedDOMDataTree distributedShardedDOMDataTree) {
338             this.prefix = prefix;
339             this.shardedDataTreeActor = shardedDataTreeActor;
340             this.distributedShardedDOMDataTree = distributedShardedDOMDataTree;
341         }
342
343         @Override
344         public void close() {
345             // first despawn on the local node
346             distributedShardedDOMDataTree.despawnShardFrontend(prefix);
347             // update the config so the remote nodes are updated
348             shardedDataTreeActor.tell(new RemovePrefixShard(prefix), noSender());
349         }
350     }
351
352     private static final class ProxyProducer extends ForwardingObject implements DOMDataTreeProducer {
353
354         private final DOMDataTreeProducer delegate;
355         private final Collection<DOMDataTreeIdentifier> subtrees;
356         private final ActorRef shardDataTreeActor;
357         private final ActorContext actorContext;
358
359         ProxyProducer(final DOMDataTreeProducer delegate,
360                       final Collection<DOMDataTreeIdentifier> subtrees,
361                       final ActorRef shardDataTreeActor,
362                       final ActorContext actorContext) {
363             this.delegate = Preconditions.checkNotNull(delegate);
364             this.subtrees = Preconditions.checkNotNull(subtrees);
365             this.shardDataTreeActor = Preconditions.checkNotNull(shardDataTreeActor);
366             this.actorContext = Preconditions.checkNotNull(actorContext);
367         }
368
369         @Nonnull
370         @Override
371         public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) {
372             return delegate.createTransaction(isolated);
373         }
374
375         @Nonnull
376         @Override
377         public DOMDataTreeProducer createProducer(@Nonnull final Collection<DOMDataTreeIdentifier> subtrees) {
378             // TODO we probably don't need to distribute this on the remote nodes since once we have this producer
379             // open we surely have the rights to all the subtrees.
380             return delegate.createProducer(subtrees);
381         }
382
383         @Override
384         public void close() throws DOMDataTreeProducerException {
385             delegate.close();
386
387             final Object o = actorContext.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
388             if (o instanceof DOMDataTreeProducerException) {
389                 throw ((DOMDataTreeProducerException) o);
390             } else if (o instanceof Throwable) {
391                 throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o);
392             }
393         }
394
395         @Override
396         protected DOMDataTreeProducer delegate() {
397             return delegate;
398         }
399     }
400 }