Bump upstream SNAPSHOTS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.PoisonPill;
16 import akka.actor.Props;
17 import com.google.common.annotations.Beta;
18 import com.google.common.annotations.VisibleForTesting;
19 import com.google.common.base.Throwables;
20 import com.google.common.util.concurrent.ListenableFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
24 import java.util.List;
25 import java.util.Set;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.TimeoutException;
29 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
30 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
31 import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
32 import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
33 import org.opendaylight.controller.cluster.datastore.config.Configuration;
34 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
35 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
36 import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator;
37 import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
38 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
39 import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
40 import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
41 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
42 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
43 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
44 import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
45 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
46 import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
47 import org.opendaylight.yangtools.concepts.ListenerRegistration;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
50 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
51 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContextListener;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54 import scala.concurrent.duration.Duration;
55
56 /**
57  * Base implementation of a distributed DOMStore.
58  */
59 public abstract class AbstractDataStore implements DistributedDataStoreInterface, EffectiveModelContextListener,
60         DatastoreContextPropertiesUpdater.Listener, DOMStoreTreeChangePublisher,
61         DOMDataTreeCommitCohortRegistry, AutoCloseable {
62
63     private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class);
64
65     private final SettableFuture<Void> readinessFuture = SettableFuture.create();
66     private final ClientIdentifier identifier;
67     private final DataStoreClient client;
68     private final ActorUtils actorUtils;
69
70     private AutoCloseable closeable;
71     private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
72     private DatastoreInfoMXBeanImpl datastoreInfoMXBean;
73
74     @SuppressWarnings("checkstyle:IllegalCatch")
75     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "Testing overrides")
76     protected AbstractDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
77             final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
78             final DatastoreSnapshot restoreFromSnapshot) {
79         requireNonNull(actorSystem, "actorSystem should not be null");
80         requireNonNull(cluster, "cluster should not be null");
81         requireNonNull(configuration, "configuration should not be null");
82         requireNonNull(datastoreContextFactory, "datastoreContextFactory should not be null");
83
84         String shardManagerId = ShardManagerIdentifier.builder()
85                 .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
86
87         LOG.info("Creating ShardManager : {}", shardManagerId);
88
89         String shardDispatcher =
90                 new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
91
92         PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
93
94         AbstractShardManagerCreator<?> creator = getShardManagerCreator().cluster(cluster).configuration(configuration)
95                 .datastoreContextFactory(datastoreContextFactory)
96                 .readinessFuture(readinessFuture)
97                 .primaryShardInfoCache(primaryShardInfoCache)
98                 .restoreFromSnapshot(restoreFromSnapshot)
99                 .distributedDataStore(this);
100
101         actorUtils = new ActorUtils(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
102                 shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
103                 primaryShardInfoCache);
104
105         final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
106             datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorUtils);
107         final ActorRef clientActor = actorSystem.actorOf(clientProps);
108         try {
109             client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
110         } catch (Exception e) {
111             LOG.error("Failed to get actor for {}", clientProps, e);
112             clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
113             Throwables.throwIfUnchecked(e);
114             throw new RuntimeException(e);
115         }
116
117         identifier = client.getIdentifier();
118         LOG.debug("Distributed data store client {} started", identifier);
119
120         datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
121                 datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
122         datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
123         datastoreConfigMXBean.registerMBean();
124
125         datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext()
126                 .getDataStoreMXBeanType(), actorUtils);
127         datastoreInfoMXBean.registerMBean();
128     }
129
130     @VisibleForTesting
131     protected AbstractDataStore(final ActorUtils actorUtils, final ClientIdentifier identifier) {
132         this.actorUtils = requireNonNull(actorUtils, "actorContext should not be null");
133         client = null;
134         this.identifier = requireNonNull(identifier);
135     }
136
137     @VisibleForTesting
138     protected AbstractDataStore(final ActorUtils actorUtils, final ClientIdentifier identifier,
139                                 final DataStoreClient clientActor) {
140         this.actorUtils = requireNonNull(actorUtils, "actorContext should not be null");
141         client = clientActor;
142         this.identifier = requireNonNull(identifier);
143     }
144
145     protected AbstractShardManagerCreator<?> getShardManagerCreator() {
146         return new ShardManagerCreator();
147     }
148
149     protected final DataStoreClient getClient() {
150         return client;
151     }
152
153     final ClientIdentifier getIdentifier() {
154         return identifier;
155     }
156
157     public void setCloseable(final AutoCloseable closeable) {
158         this.closeable = closeable;
159     }
160
161     @Override
162     public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
163             final YangInstanceIdentifier treeId, final L listener) {
164         requireNonNull(treeId, "treeId should not be null");
165         requireNonNull(listener, "listener should not be null");
166
167         /*
168          * We need to potentially deal with multi-shard composition for registration targeting the root of the data
169          * store. If that is the case, we delegate to a more complicated setup invol
170          */
171         if (treeId.isEmpty()) {
172             // User is targeting root of the datastore. If there is more than one shard, we have to register with them
173             // all and perform data composition.
174             final Set<String> shardNames = actorUtils.getConfiguration().getAllShardNames();
175             if (shardNames.size() > 1) {
176                 checkArgument(listener instanceof ClusteredDOMDataTreeChangeListener,
177                     "Cannot listen on root without non-clustered listener %s", listener);
178                 return new RootDataTreeChangeListenerProxy<>(actorUtils, listener, shardNames);
179             }
180         }
181
182         final String shardName = actorUtils.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
183         LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
184
185         final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
186                 new DataTreeChangeListenerProxy<>(actorUtils, listener, treeId);
187         listenerRegistrationProxy.init(shardName);
188
189         return listenerRegistrationProxy;
190     }
191
192     @Override
193     public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
194             final DOMDataTreeIdentifier subtree, final C cohort) {
195         YangInstanceIdentifier treeId = requireNonNull(subtree, "subtree should not be null").getRootIdentifier();
196         requireNonNull(cohort, "listener should not be null");
197
198
199         final String shardName = actorUtils.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
200         LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName);
201
202         DataTreeCohortRegistrationProxy<C> cohortProxy =
203                 new DataTreeCohortRegistrationProxy<>(actorUtils, subtree, cohort);
204         cohortProxy.init(shardName);
205         return cohortProxy;
206     }
207
208     @Override
209     public void onModelContextUpdated(final EffectiveModelContext newModelContext) {
210         actorUtils.setSchemaContext(newModelContext);
211     }
212
213     @Override
214     public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) {
215         LOG.info("DatastoreContext updated for data store {}", actorUtils.getDataStoreName());
216
217         actorUtils.setDatastoreContext(contextFactory);
218         datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext());
219     }
220
221     @Override
222     @SuppressWarnings("checkstyle:IllegalCatch")
223     public void close() {
224         LOG.info("Closing data store {}", identifier);
225
226         if (datastoreConfigMXBean != null) {
227             datastoreConfigMXBean.unregisterMBean();
228         }
229         if (datastoreInfoMXBean != null) {
230             datastoreInfoMXBean.unregisterMBean();
231         }
232
233         if (closeable != null) {
234             try {
235                 closeable.close();
236             } catch (Exception e) {
237                 LOG.debug("Error closing instance", e);
238             }
239         }
240
241         actorUtils.shutdown();
242
243         if (client != null) {
244             client.close();
245         }
246     }
247
248     @Override
249     public final ActorUtils getActorUtils() {
250         return actorUtils;
251     }
252
253     // TODO: consider removing this in favor of awaitReadiness()
254     @Deprecated
255     public void waitTillReady() {
256         LOG.info("Beginning to wait for data store to become ready : {}", identifier);
257
258         final Duration toWait = initialSettleTime();
259         try {
260             if (!awaitReadiness(toWait)) {
261                 LOG.error("Shard leaders failed to settle in {}, giving up", toWait);
262                 return;
263             }
264         } catch (InterruptedException e) {
265             LOG.error("Interrupted while waiting for shards to settle", e);
266             return;
267         }
268
269         LOG.debug("Data store {} is now ready", identifier);
270     }
271
272     @Beta
273     @Deprecated
274     public boolean awaitReadiness() throws InterruptedException {
275         return awaitReadiness(initialSettleTime());
276     }
277
278     @Beta
279     @Deprecated
280     public boolean awaitReadiness(final Duration toWait) throws InterruptedException {
281         try {
282             if (toWait.isFinite()) {
283                 try {
284                     readinessFuture.get(toWait.toNanos(), TimeUnit.NANOSECONDS);
285                 } catch (TimeoutException e) {
286                     LOG.debug("Timed out waiting for shards to settle", e);
287                     return false;
288                 }
289             } else {
290                 readinessFuture.get();
291             }
292         } catch (ExecutionException e) {
293             LOG.warn("Unexpected readiness failure, assuming convergence", e);
294         }
295
296         return true;
297     }
298
299     @Beta
300     @Deprecated
301     public void awaitReadiness(final long timeout, final TimeUnit unit) throws InterruptedException, TimeoutException {
302         if (!awaitReadiness(Duration.create(timeout, unit))) {
303             throw new TimeoutException("Shard leaders failed to settle");
304         }
305     }
306
307     @SuppressWarnings("checkstyle:IllegalCatch")
308     private static ActorRef createShardManager(final ActorSystem actorSystem,
309             final AbstractShardManagerCreator<?> creator, final String shardDispatcher,
310             final String shardManagerId) {
311         Exception lastException = null;
312
313         for (int i = 0; i < 100; i++) {
314             try {
315                 return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher), shardManagerId);
316             } catch (Exception e) {
317                 lastException = e;
318                 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
319                 LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying "
320                         + "(retry count = {})", shardManagerId, e.getMessage(), i);
321             }
322         }
323
324         throw new IllegalStateException("Failed to create Shard Manager", lastException);
325     }
326
327     /**
328      * Future which completes when all shards settle for the first time.
329      *
330      * @return A Listenable future.
331      */
332     public final ListenableFuture<?> initialSettleFuture() {
333         return readinessFuture;
334     }
335
336     @VisibleForTesting
337     SettableFuture<Void> readinessFuture() {
338         return readinessFuture;
339     }
340
341     @Override
342     @SuppressWarnings("unchecked")
343     public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerProxyListener(
344             final YangInstanceIdentifier shardLookup, final YangInstanceIdentifier insideShard,
345             final DOMDataTreeChangeListener delegate) {
346
347         requireNonNull(shardLookup, "shardLookup should not be null");
348         requireNonNull(insideShard, "insideShard should not be null");
349         requireNonNull(delegate, "delegate should not be null");
350
351         final String shardName = actorUtils.getShardStrategyFactory().getStrategy(shardLookup).findShard(shardLookup);
352         LOG.debug("Registering tree listener: {} for tree: {} shard: {}, path inside shard: {}",
353                 delegate,shardLookup, shardName, insideShard);
354
355         // wrap this in the ClusteredDOMDataTreeChangeLister interface
356         // since we always want clustered registration
357         final DataTreeChangeListenerProxy<DOMDataTreeChangeListener> listenerRegistrationProxy =
358                 new DataTreeChangeListenerProxy<>(actorUtils, new ClusteredDOMDataTreeChangeListener() {
359                     @Override
360                     public void onDataTreeChanged(final List<DataTreeCandidate> changes) {
361                         delegate.onDataTreeChanged(changes);
362                     }
363
364                     @Override
365                     public void onInitialData() {
366                         delegate.onInitialData();
367                     }
368                 }, insideShard);
369         listenerRegistrationProxy.init(shardName);
370
371         return (ListenerRegistration<L>) listenerRegistrationProxy;
372     }
373
374     private Duration initialSettleTime() {
375         final DatastoreContext context = actorUtils.getDatastoreContext();
376         final int multiplier = context.getInitialSettleTimeoutMultiplier();
377         return multiplier == 0 ? Duration.Inf() : context.getShardLeaderElectionTimeout().duration().$times(multiplier);
378     }
379 }