d0d3fe9e6aa1b928b0e6524307ba60b8900720ee
[controller.git] / opendaylight / md-sal / sal-inmemory-datastore / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
20 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
21 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
22 import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
23 import org.opendaylight.yangtools.util.ExecutorServiceUtil;
24 import org.opendaylight.yangtools.util.concurrent.NotificationManager;
25 import org.opendaylight.yangtools.util.concurrent.QueuedNotificationManager;
26 import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
27 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
28 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
29 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
30 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
31 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
32 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
33 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
35 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
36 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
37 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
38 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
39 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
40 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
41 import org.opendaylight.yangtools.concepts.Identifiable;
42 import org.opendaylight.yangtools.concepts.ListenerRegistration;
43 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
44 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
45 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
46 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50 import javax.annotation.concurrent.GuardedBy;
51
52 import java.util.Collections;
53 import java.util.concurrent.Callable;
54 import java.util.concurrent.ExecutorService;
55 import java.util.concurrent.TimeUnit;
56 import java.util.concurrent.atomic.AtomicLong;
57
58 import static com.google.common.base.Preconditions.checkState;
59
60 /**
61  * In-memory DOM Data Store
62  *
63  * Implementation of {@link DOMStore} which uses {@link DataTree} and other
64  * classes such as {@link SnapshotBackedWriteTransaction}.
65  * {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
66  * to implement {@link DOMStore} contract.
67  *
68  */
69 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
70         TransactionReadyPrototype,AutoCloseable {
71     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
72
73     @SuppressWarnings("rawtypes")
74     private static final QueuedNotificationManager.Invoker<AsyncDataChangeListener,
75                                        AsyncDataChangeEvent> DCL_NOTIFICATION_MGR_INVOKER =
76             new QueuedNotificationManager.Invoker<AsyncDataChangeListener,
77                                                   AsyncDataChangeEvent>() {
78
79                 @SuppressWarnings("unchecked")
80                 @Override
81                 public void invokeListener( AsyncDataChangeListener listener,
82                                             AsyncDataChangeEvent notification ) {
83                     listener.onDataChanged(notification);
84                 }
85             };
86
87     private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
88     private final ListenerTree listenerTree = ListenerTree.create();
89     private final AtomicLong txCounter = new AtomicLong(0);
90     private final ListeningExecutorService listeningExecutor;
91
92     @SuppressWarnings("rawtypes")
93     private final NotificationManager<AsyncDataChangeListener,AsyncDataChangeEvent>
94                                                               dataChangeListenerNotificationManager;
95     private final ExecutorService dataChangeListenerExecutor;
96
97     private final String name;
98
99     public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
100             final ExecutorService dataChangeListenerExecutor) {
101         this(name, listeningExecutor, dataChangeListenerExecutor,
102                 InMemoryDOMDataStoreConfigProperties.DEFAULT_MAX_DATA_CHANGE_LISTENER_QUEUE_SIZE);
103     }
104
105     public InMemoryDOMDataStore(final String name, final ListeningExecutorService listeningExecutor,
106             final ExecutorService dataChangeListenerExecutor, int maxDataChangeListenerQueueSize) {
107         this.name = Preconditions.checkNotNull(name);
108         this.listeningExecutor = Preconditions.checkNotNull(listeningExecutor);
109
110         this.dataChangeListenerExecutor = Preconditions.checkNotNull(dataChangeListenerExecutor);
111
112         dataChangeListenerNotificationManager =
113                 new QueuedNotificationManager<>(this.dataChangeListenerExecutor,
114                         DCL_NOTIFICATION_MGR_INVOKER, maxDataChangeListenerQueueSize,
115                         "DataChangeListenerQueueMgr");
116     }
117
118     @Override
119     public final String getIdentifier() {
120         return name;
121     }
122
123     @Override
124     public DOMStoreReadTransaction newReadOnlyTransaction() {
125         return new SnapshotBackedReadTransaction(nextIdentifier(), dataTree.takeSnapshot());
126     }
127
128     @Override
129     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
130         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this);
131     }
132
133     @Override
134     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
135         return new SnapshotBackedWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this);
136     }
137
138     @Override
139     public DOMStoreTransactionChain createTransactionChain() {
140         return new DOMStoreTransactionChainImpl();
141     }
142
143     @Override
144     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
145         dataTree.setSchemaContext(ctx);
146     }
147
148     @Override
149     public void close() {
150         ExecutorServiceUtil.tryGracefulShutdown(listeningExecutor, 30, TimeUnit.SECONDS);
151         ExecutorServiceUtil.tryGracefulShutdown(dataChangeListenerExecutor, 30, TimeUnit.SECONDS);
152     }
153     @Override
154     public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
155             final YangInstanceIdentifier path, final L listener, final DataChangeScope scope) {
156
157         /*
158          * Make sure commit is not occurring right now. Listener has to be
159          * registered and its state capture enqueued at a consistent point.
160          *
161          * FIXME: improve this to read-write lock, such that multiple listener
162          * registrations can occur simultaneously
163          */
164         final DataChangeListenerRegistration<L> reg;
165         synchronized (this) {
166             LOG.debug("{}: Registering data change listener {} for {}", name, listener, path);
167
168             reg = listenerTree.registerDataChangeListener(path, listener, scope);
169
170             Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
171             if (currentState.isPresent()) {
172                 final NormalizedNode<?, ?> data = currentState.get();
173
174                 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) //
175                         .setAfter(data) //
176                         .addCreated(path, data) //
177                         .build();
178
179                 new ChangeListenerNotifyTask(Collections.singletonList(reg), event,
180                         dataChangeListenerNotificationManager).run();
181             }
182         }
183
184         return new AbstractListenerRegistration<L>(listener) {
185             @Override
186             protected void removeRegistration() {
187                 synchronized (InMemoryDOMDataStore.this) {
188                     reg.close();
189                 }
190             }
191         };
192     }
193
194     @Override
195     public synchronized DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction writeTx) {
196         LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView());
197         return new ThreePhaseCommitImpl(writeTx);
198     }
199
200     private Object nextIdentifier() {
201         return name + "-" + txCounter.getAndIncrement();
202     }
203
204     private class DOMStoreTransactionChainImpl implements DOMStoreTransactionChain, TransactionReadyPrototype {
205
206         @GuardedBy("this")
207         private SnapshotBackedWriteTransaction latestOutstandingTx;
208
209         private boolean chainFailed = false;
210
211         private void checkFailed() {
212             Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
213         }
214
215         @Override
216         public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
217             final DataTreeSnapshot snapshot;
218             checkFailed();
219             if (latestOutstandingTx != null) {
220                 checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
221                 snapshot = latestOutstandingTx.getMutatedView();
222             } else {
223                 snapshot = dataTree.takeSnapshot();
224             }
225             return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
226         }
227
228         @Override
229         public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
230             final DataTreeSnapshot snapshot;
231             checkFailed();
232             if (latestOutstandingTx != null) {
233                 checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
234                 snapshot = latestOutstandingTx.getMutatedView();
235             } else {
236                 snapshot = dataTree.takeSnapshot();
237             }
238             final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(),
239                     snapshot, this);
240             latestOutstandingTx = ret;
241             return ret;
242         }
243
244         @Override
245         public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
246             final DataTreeSnapshot snapshot;
247             checkFailed();
248             if (latestOutstandingTx != null) {
249                 checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
250                 snapshot = latestOutstandingTx.getMutatedView();
251             } else {
252                 snapshot = dataTree.takeSnapshot();
253             }
254             final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,
255                     this);
256             latestOutstandingTx = ret;
257             return ret;
258         }
259
260         @Override
261         public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction tx) {
262             DOMStoreThreePhaseCommitCohort storeCohort = InMemoryDOMDataStore.this.ready(tx);
263             return new ChainedTransactionCommitImpl(tx, storeCohort, this);
264         }
265
266         @Override
267         public void close() {
268
269             // FIXME: this call doesn't look right here - listeningExecutor is shared and owned
270             // by the outer class.
271             //listeningExecutor.shutdownNow();
272         }
273
274         protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
275                 final Throwable t) {
276             chainFailed = true;
277
278         }
279
280         public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
281             // If commited transaction is latestOutstandingTx we clear
282             // latestOutstandingTx
283             // field in order to base new transactions on Datastore Data Tree
284             // directly.
285             if (transaction.equals(latestOutstandingTx)) {
286                 latestOutstandingTx = null;
287             }
288         }
289
290     }
291
292     private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
293
294         private final SnapshotBackedWriteTransaction transaction;
295         private final DOMStoreThreePhaseCommitCohort delegate;
296
297         private final DOMStoreTransactionChainImpl txChain;
298
299         protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
300                 final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
301             super();
302             this.transaction = transaction;
303             this.delegate = delegate;
304             this.txChain = txChain;
305         }
306
307         @Override
308         public ListenableFuture<Boolean> canCommit() {
309             return delegate.canCommit();
310         }
311
312         @Override
313         public ListenableFuture<Void> preCommit() {
314             return delegate.preCommit();
315         }
316
317         @Override
318         public ListenableFuture<Void> abort() {
319             return delegate.abort();
320         }
321
322         @Override
323         public ListenableFuture<Void> commit() {
324             ListenableFuture<Void> commitFuture = delegate.commit();
325             Futures.addCallback(commitFuture, new FutureCallback<Void>() {
326                 @Override
327                 public void onFailure(final Throwable t) {
328                     txChain.onTransactionFailed(transaction, t);
329                 }
330
331                 @Override
332                 public void onSuccess(final Void result) {
333                     txChain.onTransactionCommited(transaction);
334                 }
335
336             });
337             return commitFuture;
338         }
339
340     }
341
342     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
343
344         private final SnapshotBackedWriteTransaction transaction;
345         private final DataTreeModification modification;
346
347         private ResolveDataChangeEventsTask listenerResolver;
348         private DataTreeCandidate candidate;
349
350         public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) {
351             this.transaction = writeTransaction;
352             this.modification = transaction.getMutatedView();
353         }
354
355         @Override
356         public ListenableFuture<Boolean> canCommit() {
357             return listeningExecutor.submit(new Callable<Boolean>() {
358                 @Override
359                 public Boolean call() throws TransactionCommitFailedException {
360                     try {
361                         dataTree.validate(modification);
362                         LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
363                         return true;
364                     } catch (ConflictingModificationAppliedException e) {
365                         LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
366                                 e.getPath());
367                         throw new OptimisticLockFailedException("Optimistic lock failed.",e);
368                     } catch (DataValidationFailedException e) {
369                         LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
370                                 e.getPath(), e);
371                         throw new TransactionCommitFailedException("Data did not pass validation.",e);
372                     }
373                 }
374             });
375         }
376
377         @Override
378         public ListenableFuture<Void> preCommit() {
379             return listeningExecutor.submit(new Callable<Void>() {
380                 @Override
381                 public Void call() {
382                     candidate = dataTree.prepare(modification);
383                     listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree,
384                             dataChangeListenerNotificationManager);
385                     return null;
386                 }
387             });
388         }
389
390         @Override
391         public ListenableFuture<Void> abort() {
392             candidate = null;
393             return Futures.immediateFuture(null);
394         }
395
396         @Override
397         public ListenableFuture<Void> commit() {
398             checkState(candidate != null, "Proposed subtree must be computed");
399
400             /*
401              * The commit has to occur atomically with regard to listener
402              * registrations.
403              */
404             synchronized (this) {
405                 dataTree.commit(candidate);
406
407                 for (ChangeListenerNotifyTask task : listenerResolver.call()) {
408                     LOG.trace("Scheduling invocation of listeners: {}", task);
409                     task.run();
410                 }
411             }
412
413             return Futures.immediateFuture(null);
414         }
415     }
416 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.