Correctly forward DOMDataTreeChangeListener.onInitialData()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import com.google.common.annotations.Beta;
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.base.Throwables;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import java.util.Collection;
24 import java.util.Set;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
29 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
30 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
32 import org.opendaylight.controller.cluster.datastore.config.Configuration;
33 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
34 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
35 import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator;
36 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
37 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
38 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
39 import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
40 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
41 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
42 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
43 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
44 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
45 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
46 import org.opendaylight.yangtools.concepts.ListenerRegistration;
47 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
48 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
49 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
50 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContextListener;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import scala.concurrent.duration.Duration;
54
55 /**
56  * Base implementation of a distributed DOMStore.
57  */
58 public abstract class AbstractDataStore implements DistributedDataStoreInterface, EffectiveModelContextListener,
59         DatastoreContextPropertiesUpdater.Listener, DOMStoreTreeChangePublisher,
60         DOMDataTreeCommitCohortRegistry, AutoCloseable {
61
62     private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class);
63
64     private final SettableFuture<Void> readinessFuture = SettableFuture.create();
65     private final ClientIdentifier identifier;
66     private final DataStoreClient client;
67     private final ActorUtils actorUtils;
68
69     private AutoCloseable closeable;
70     private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
71     private DatastoreInfoMXBeanImpl datastoreInfoMXBean;
72
73     @SuppressWarnings("checkstyle:IllegalCatch")
74     protected AbstractDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
75             final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
76             final DatastoreSnapshot restoreFromSnapshot) {
77         requireNonNull(actorSystem, "actorSystem should not be null");
78         requireNonNull(cluster, "cluster should not be null");
79         requireNonNull(configuration, "configuration should not be null");
80         requireNonNull(datastoreContextFactory, "datastoreContextFactory should not be null");
81
82         String shardManagerId = ShardManagerIdentifier.builder()
83                 .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
84
85         LOG.info("Creating ShardManager : {}", shardManagerId);
86
87         String shardDispatcher =
88                 new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
89
90         PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
91
92         AbstractShardManagerCreator<?> creator = getShardManagerCreator().cluster(cluster).configuration(configuration)
93                 .datastoreContextFactory(datastoreContextFactory)
94                 .readinessFuture(readinessFuture)
95                 .primaryShardInfoCache(primaryShardInfoCache)
96                 .restoreFromSnapshot(restoreFromSnapshot)
97                 .distributedDataStore(this);
98
99         actorUtils = new ActorUtils(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
100                 shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
101                 primaryShardInfoCache);
102
103         final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
104             datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorUtils);
105         final ActorRef clientActor = actorSystem.actorOf(clientProps);
106         try {
107             client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
108         } catch (Exception e) {
109             LOG.error("Failed to get actor for {}", clientProps, e);
110             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
111             Throwables.throwIfUnchecked(e);
112             throw new RuntimeException(e);
113         }
114
115         identifier = client.getIdentifier();
116         LOG.debug("Distributed data store client {} started", identifier);
117
118         datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
119                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
120         datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
121         datastoreConfigMXBean.registerMBean();
122
123         datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext()
124                 .getDataStoreMXBeanType(), actorUtils);
125         datastoreInfoMXBean.registerMBean();
126     }
127
128     @VisibleForTesting
129     protected AbstractDataStore(final ActorUtils actorUtils, final ClientIdentifier identifier) {
130         this.actorUtils = requireNonNull(actorUtils, "actorContext should not be null");
131         this.client = null;
132         this.identifier = requireNonNull(identifier);
133     }
134
135     @VisibleForTesting
136     protected AbstractDataStore(final ActorUtils actorUtils, final ClientIdentifier identifier,
137                                 final DataStoreClient clientActor) {
138         this.actorUtils = requireNonNull(actorUtils, "actorContext should not be null");
139         this.client = clientActor;
140         this.identifier = requireNonNull(identifier);
141     }
142
143     protected AbstractShardManagerCreator<?> getShardManagerCreator() {
144         return new ShardManagerCreator();
145     }
146
147     protected final DataStoreClient getClient() {
148         return client;
149     }
150
151     final ClientIdentifier getIdentifier() {
152         return identifier;
153     }
154
155     public void setCloseable(final AutoCloseable closeable) {
156         this.closeable = closeable;
157     }
158
159     @Override
160     public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
161             final YangInstanceIdentifier treeId, final L listener) {
162         requireNonNull(treeId, "treeId should not be null");
163         requireNonNull(listener, "listener should not be null");
164
165         /*
166          * We need to potentially deal with multi-shard composition for registration targeting the root of the data
167          * store. If that is the case, we delegate to a more complicated setup invol
168          */
169         if (treeId.isEmpty()) {
170             // User is targeting root of the datastore. If there is more than one shard, we have to register with them
171             // all and perform data composition.
172             final Set<String> shardNames = actorUtils.getConfiguration().getAllShardNames();
173             if (shardNames.size() > 1) {
174                 checkArgument(listener instanceof ClusteredDOMDataTreeChangeListener,
175                     "Cannot listen on root without non-clustered listener %s", listener);
176                 return new RootDataTreeChangeListenerProxy<>(actorUtils, listener, shardNames);
177             }
178         }
179
180         final String shardName = actorUtils.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
181         LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
182
183         final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
184                 new DataTreeChangeListenerProxy<>(actorUtils, listener, treeId);
185         listenerRegistrationProxy.init(shardName);
186
187         return listenerRegistrationProxy;
188     }
189
190     @Override
191     public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
192             final DOMDataTreeIdentifier subtree, final C cohort) {
193         YangInstanceIdentifier treeId = requireNonNull(subtree, "subtree should not be null").getRootIdentifier();
194         requireNonNull(cohort, "listener should not be null");
195
196
197         final String shardName = actorUtils.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
198         LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName);
199
200         DataTreeCohortRegistrationProxy<C> cohortProxy =
201                 new DataTreeCohortRegistrationProxy<>(actorUtils, subtree, cohort);
202         cohortProxy.init(shardName);
203         return cohortProxy;
204     }
205
206     @Override
207     public void onModelContextUpdated(final EffectiveModelContext newModelContext) {
208         actorUtils.setSchemaContext(newModelContext);
209     }
210
211     @Override
212     public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) {
213         LOG.info("DatastoreContext updated for data store {}", actorUtils.getDataStoreName());
214
215         actorUtils.setDatastoreContext(contextFactory);
216         datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext());
217     }
218
219     @Override
220     @SuppressWarnings("checkstyle:IllegalCatch")
221     public void close() {
222         LOG.info("Closing data store {}", identifier);
223
224         if (datastoreConfigMXBean != null) {
225             datastoreConfigMXBean.unregisterMBean();
226         }
227         if (datastoreInfoMXBean != null) {
228             datastoreInfoMXBean.unregisterMBean();
229         }
230
231         if (closeable != null) {
232             try {
233                 closeable.close();
234             } catch (Exception e) {
235                 LOG.debug("Error closing instance", e);
236             }
237         }
238
239         actorUtils.shutdown();
240
241         if (client != null) {
242             client.close();
243         }
244     }
245
246     @Override
247     public ActorUtils getActorUtils() {
248         return actorUtils;
249     }
250
251     // TODO: consider removing this in favor of awaitReadiness()
252     @Deprecated
253     public void waitTillReady() {
254         LOG.info("Beginning to wait for data store to become ready : {}", identifier);
255
256         final Duration toWait = initialSettleTime();
257         try {
258             if (!awaitReadiness(toWait)) {
259                 LOG.error("Shard leaders failed to settle in {}, giving up", toWait);
260                 return;
261             }
262         } catch (InterruptedException e) {
263             LOG.error("Interrupted while waiting for shards to settle", e);
264             return;
265         }
266
267         LOG.debug("Data store {} is now ready", identifier);
268     }
269
270     @Beta
271     @Deprecated
272     public boolean awaitReadiness() throws InterruptedException {
273         return awaitReadiness(initialSettleTime());
274     }
275
276     @Beta
277     @Deprecated
278     public boolean awaitReadiness(final Duration toWait) throws InterruptedException {
279         try {
280             if (toWait.isFinite()) {
281                 try {
282                     readinessFuture.get(toWait.toNanos(), TimeUnit.NANOSECONDS);
283                 } catch (TimeoutException e) {
284                     LOG.debug("Timed out waiting for shards to settle", e);
285                     return false;
286                 }
287             } else {
288                 readinessFuture.get();
289             }
290         } catch (ExecutionException e) {
291             LOG.warn("Unexpected readiness failure, assuming convergence", e);
292         }
293
294         return true;
295     }
296
297     @Beta
298     @Deprecated
299     public void awaitReadiness(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException {
300         if (!awaitReadiness(Duration.create(timeout, unit))) {
301             throw new TimeoutException("Shard leaders failed to settle");
302         }
303     }
304
305     @SuppressWarnings("checkstyle:IllegalCatch")
306     private static ActorRef createShardManager(final ActorSystem actorSystem,
307             final AbstractShardManagerCreator<?> creator, final String shardDispatcher,
308             final String shardManagerId) {
309         Exception lastException = null;
310
311         for (int i = 0; i < 100; i++) {
312             try {
313                 return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher), shardManagerId);
314             } catch (Exception e) {
315                 lastException = e;
316                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
317                 LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying "
318                         + "(retry count = {})", shardManagerId, e.getMessage(), i);
319             }
320         }
321
322         throw new IllegalStateException("Failed to create Shard Manager", lastException);
323     }
324
325     /**
326      * Future which completes when all shards settle for the first time.
327      *
328      * @return A Listenable future.
329      */
330     public final ListenableFuture<?> initialSettleFuture() {
331         return readinessFuture;
332     }
333
334     @VisibleForTesting
335     SettableFuture<Void> readinessFuture() {
336         return readinessFuture;
337     }
338
339     @Override
340     @SuppressWarnings("unchecked")
341     public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerProxyListener(
342             final YangInstanceIdentifier shardLookup, final YangInstanceIdentifier insideShard,
343             final DOMDataTreeChangeListener delegate) {
344
345         requireNonNull(shardLookup, "shardLookup should not be null");
346         requireNonNull(insideShard, "insideShard should not be null");
347         requireNonNull(delegate, "delegate should not be null");
348
349         final String shardName = actorUtils.getShardStrategyFactory().getStrategy(shardLookup).findShard(shardLookup);
350         LOG.debug("Registering tree listener: {} for tree: {} shard: {}, path inside shard: {}",
351                 delegate,shardLookup, shardName, insideShard);
352
353         // wrap this in the ClusteredDOMDataTreeChangeLister interface
354         // since we always want clustered registration
355         final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> listenerRegistrationProxy =
356                 new DataTreeChangeListenerProxy<>(actorUtils, new ClusteredDOMDataTreeChangeListener() {
357                     @Override
358                     public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
359                         delegate.onDataTreeChanged(changes);
360                     }
361
362                     @Override
363                     public void onInitialData() {
364                         delegate.onInitialData();
365                     }
366                 }, insideShard);
367         listenerRegistrationProxy.init(shardName);
368
369         return (ListenerRegistration<L>) listenerRegistrationProxy;
370     }
371
372     private Duration initialSettleTime() {
373         final DatastoreContext context = actorUtils.getDatastoreContext();
374         final int multiplier = context.getInitialSettleTimeoutMultiplier();
375         return multiplier == 0 ? Duration.Inf() : context.getShardLeaderElectionTimeout().duration().$times(multiplier);
376     }
377 }