Bug 1430: Off-load notifications from single commit thread
[controller.git] / opendaylight / md-sal / sal-inmemory-datastore / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
20 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
22 import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
23 import org.opendaylight.yangtools.util.ExecutorServiceUtil;
24 import org.opendaylight.yangtools.util.PropertyUtils;
25 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
26 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
27 import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
28 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
29 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
30 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
31 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
32 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
33 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
34 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
40 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
41 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
42 import org.opendaylight.yangtools.concepts.Identifiable;
43 import org.opendaylight.yangtools.concepts.ListenerRegistration;
44 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
45 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
47 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50
51 import javax.annotation.concurrent.GuardedBy;
52
53 import java.util.Collections;
54 import java.util.concurrent.Callable;
55 import java.util.concurrent.ExecutorService;
56 import java.util.concurrent.TimeUnit;
57 import java.util.concurrent.atomic.AtomicLong;
58
59 import static com.google.common.base.Preconditions.checkState;
60
61 /**
62  * In-memory DOM Data Store
63  *
64  * Implementation of {@link DOMStore} which uses {@link DataTree} and other
65  * classes such as {@link SnapshotBackedWriteTransaction}.
66  * {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
67  * to implement {@link DOMStore} contract.
68  *
69  */
70 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
71         TransactionReadyPrototype,AutoCloseable {
72     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
73
74     @SuppressWarnings("rawtypes")
75     private static final QueuedNotificationManager.Invoker<AsyncDataChangeListener,
76                                        AsyncDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
77             new QueuedNotificationManager.Invoker<AsyncDataChangeListener,
78                                                   AsyncDataChangeEvent>() {
79
80                 @SuppressWarnings("unchecked")
81                 @Override
82                 public void invokeListener( AsyncDataChangeListener listener,
83                                             AsyncDataChangeEvent notification ) {
84                     listener.onDataChanged(notification);
85                 }
86             };
87
88     private static final String DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP =
89             "mdsal.datastore-dcl-notification-queue.size";
90
91     private static final int DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE = 1000;
92
93     private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
94     private final ListenerTree listenerTree = ListenerTree.create();
95     private final AtomicLong txCounter = new AtomicLong(0);
96     private final ListeningExecutorService listeningExecutor;
97
98     @SuppressWarnings("rawtypes")
99     private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent>
100                                                               dataChangeListenerNotificationManager;
101     private final ExecutorService dataChangeListenerExecutor;
102
103     private final String name;
104
105     public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
106             final ExecutorService dataChangeListenerExecutor) {
107         this.name = Preconditions.checkNotNull(name);
108         this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor);
109
110         this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
111
112         int maxDCLQueueSize = PropertyUtils.getIntSystemProperty(
113                 DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE_PROP, DEFAULT_DCL_NOTIFICATION_MGR_MAX_QUEUE_SIZE );
114
115         dataChangeListenerNotificationManager =
116                 new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
117                         DCL_NOTIFICATION_MGR_INVOKER, maxDCLQueueSize, "DataChangeListenerQueueMgr");
118     }
119
120     @Override
121     public final String getIdentifier() {
122         return name;
123     }
124
125     @Override
126     public DOMStoreReadTransaction newReadOnlyTransaction() {
127         return new SnapshotBackedReadTransaction(nextIdentifier(), dataTree.takeSnapshot());
128     }
129
130     @Override
131     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
132         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this);
133     }
134
135     @Override
136     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
137         return new SnapshotBackedWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this);
138     }
139
140     @Override
141     public DOMStoreTransactionChain createTransactionChain() {
142         return new DOMStoreTransactionChainImpl();
143     }
144
145     @Override
146     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
147         dataTree.setSchemaContext(ctx);
148     }
149
150     @Override
151     public void close() {
152         ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS);
153         ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
154     }
155     @Override
156     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
157             final YangInstanceIdentifier path, final L listener, final DataChangeScope scope) {
158
159         /*
160          * Make sure commit is not occurring right now. Listener has to be
161          * registered and its state capture enqueued at a consistent point.
162          *
163          * FIXME: improve this to read-write lock, such that multiple listener
164          * registrations can occur simultaneously
165          */
166         final DataChangeListenerRegistration<L> reg;
167         synchronized (this) {
168             LOG.debug("{}: Registering data change listener {} for {}", name, listener, path);
169
170             reg = listenerTree.registerDataChangeListener(path, listener, scope);
171
172             Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
173             if (currentState.isPresent()) {
174                 final NormalizedNode<?, ?> data = currentState.get();
175
176                 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) //
177                         .setAfter(data) //
178                         .addCreated(path, data) //
179                         .build();
180
181                 new ChangeListenerNotifyTask(Collections.singletonList(reg), event,
182                         dataChangeListenerNotificationManager).run();
183             }
184         }
185
186         return new AbstractListenerRegistration<L>(listener) {
187             @Override
188             protected void removeRegistration() {
189                 synchronized (InMemoryDOMDataStore.this) {
190                     reg.close();
191                 }
192             }
193         };
194     }
195
196     @Override
197     public synchronized DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction writeTx) {
198         LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView());
199         return new ThreePhaseCommitImpl(writeTx);
200     }
201
202     private Object nextIdentifier() {
203         return name + "-" + txCounter.getAndIncrement();
204     }
205
206     private class DOMStoreTransactionChainImpl implements DOMStoreTransactionChain, TransactionReadyPrototype {
207
208         @GuardedBy("this")
209         private SnapshotBackedWriteTransaction latestOutstandingTx;
210
211         private boolean chainFailed = false;
212
213         private void checkFailed() {
214             Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
215         }
216
217         @Override
218         public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
219             final DataTreeSnapshot snapshot;
220             checkFailed();
221             if (latestOutstandingTx != null) {
222                 checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
223                 snapshot = latestOutstandingTx.getMutatedView();
224             } else {
225                 snapshot = dataTree.takeSnapshot();
226             }
227             return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
228         }
229
230         @Override
231         public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
232             final DataTreeSnapshot snapshot;
233             checkFailed();
234             if (latestOutstandingTx != null) {
235                 checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
236                 snapshot = latestOutstandingTx.getMutatedView();
237             } else {
238                 snapshot = dataTree.takeSnapshot();
239             }
240             final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(),
241                     snapshot, this);
242             latestOutstandingTx = ret;
243             return ret;
244         }
245
246         @Override
247         public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
248             final DataTreeSnapshot snapshot;
249             checkFailed();
250             if (latestOutstandingTx != null) {
251                 checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
252                 snapshot = latestOutstandingTx.getMutatedView();
253             } else {
254                 snapshot = dataTree.takeSnapshot();
255             }
256             final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,
257                     this);
258             latestOutstandingTx = ret;
259             return ret;
260         }
261
262         @Override
263         public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction tx) {
264             DOMStoreThreePhaseCommitCohort storeCohort = InMemoryDOMDataStore.this.ready(tx);
265             return new ChainedTransactionCommitImpl(tx, storeCohort, this);
266         }
267
268         @Override
269         public void close() {
270
271             // FIXME: this call doesn't look right here - listeningExecutor is shared and owned
272             // by the outer class.
273             //listeningExecutor.shutdownNow();
274         }
275
276         protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
277                 final Throwable t) {
278             chainFailed = true;
279
280         }
281
282         public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
283             // If commited transaction is latestOutstandingTx we clear
284             // latestOutstandingTx
285             // field in order to base new transactions on Datastore Data Tree
286             // directly.
287             if (transaction.equals(latestOutstandingTx)) {
288                 latestOutstandingTx = null;
289             }
290         }
291
292     }
293
294     private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
295
296         private final SnapshotBackedWriteTransaction transaction;
297         private final DOMStoreThreePhaseCommitCohort delegate;
298
299         private final DOMStoreTransactionChainImpl txChain;
300
301         protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
302                 final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
303             super();
304             this.transaction = transaction;
305             this.delegate = delegate;
306             this.txChain = txChain;
307         }
308
309         @Override
310         public ListenableFuture<Boolean> canCommit() {
311             return delegate.canCommit();
312         }
313
314         @Override
315         public ListenableFuture<Void> preCommit() {
316             return delegate.preCommit();
317         }
318
319         @Override
320         public ListenableFuture<Void> abort() {
321             return delegate.abort();
322         }
323
324         @Override
325         public ListenableFuture<Void> commit() {
326             ListenableFuture<Void> commitFuture = delegate.commit();
327             Futures.addCallback(commitFuture, new FutureCallback<Void>() {
328                 @Override
329                 public void onFailure(final Throwable t) {
330                     txChain.onTransactionFailed(transaction, t);
331                 }
332
333                 @Override
334                 public void onSuccess(final Void result) {
335                     txChain.onTransactionCommited(transaction);
336                 }
337
338             });
339             return commitFuture;
340         }
341
342     }
343
344     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
345
346         private final SnapshotBackedWriteTransaction transaction;
347         private final DataTreeModification modification;
348
349         private ResolveDataChangeEventsTask listenerResolver;
350         private DataTreeCandidate candidate;
351
352         public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) {
353             this.transaction = writeTransaction;
354             this.modification = transaction.getMutatedView();
355         }
356
357         @Override
358         public ListenableFuture<Boolean> canCommit() {
359             return listeningExecutor.submit(new Callable<Boolean>() {
360                 @Override
361                 public Boolean call() throws TransactionCommitFailedException {
362                     try {
363                         dataTree.validate(modification);
364                         LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
365                         return true;
366                     } catch (ConflictingModificationAppliedException e) {
367                         LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
368                                 e.getPath());
369                         throw new OptimisticLockFailedException("Optimistic lock failed.",e);
370                     } catch (DataValidationFailedException e) {
371                         LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
372                                 e.getPath(), e);
373                         throw new TransactionCommitFailedException("Data did not pass validation.",e);
374                     }
375                 }
376             });
377         }
378
379         @Override
380         public ListenableFuture<Void> preCommit() {
381             return listeningExecutor.submit(new Callable<Void>() {
382                 @Override
383                 public Void call() {
384                     candidate = dataTree.prepare(modification);
385                     listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree,
386                             dataChangeListenerNotificationManager);
387                     return null;
388                 }
389             });
390         }
391
392         @Override
393         public ListenableFuture<Void> abort() {
394             candidate = null;
395             return Futures.immediateFuture(null);
396         }
397
398         @Override
399         public ListenableFuture<Void> commit() {
400             checkState(candidate != null, "Proposed subtree must be computed");
401
402             /*
403              * The commit has to occur atomically with regard to listener
404              * registrations.
405              */
406             synchronized (this) {
407                 dataTree.commit(candidate);
408
409                 for (ChangeListenerNotifyTask task : listenerResolver.call()) {
410                     LOG.trace("Scheduling invocation of listeners: {}", task);
411                     task.run();
412                 }
413             }
414
415             return Futures.immediateFuture(null);
416         }
417     }
418 }