Bug 499: Added support for change listeners.
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
13
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.atomic.AtomicLong;
16
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
19 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
20 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
28 import org.opendaylight.yangtools.concepts.Identifiable;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import com.google.common.base.Optional;
40 import com.google.common.primitives.UnsignedLong;
41 import com.google.common.util.concurrent.Futures;
42 import com.google.common.util.concurrent.ListenableFuture;
43 import com.google.common.util.concurrent.ListeningExecutorService;
44
45 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
46
47     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
48     private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
49
50
51     private final ListeningExecutorService executor;
52     private final String name;
53     private final AtomicLong txCounter = new AtomicLong(0);
54
55     private DataAndMetadataSnapshot snapshot;
56     private ModificationApplyOperation operationTree;
57     private final ListenerRegistrationNode listenerTree;
58
59
60
61     private SchemaContext schemaContext;
62
63     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
64         this.executor = executor;
65         this.name = name;
66         this.operationTree = new AllwaysFailOperation();
67         this.snapshot = DataAndMetadataSnapshot.createEmpty();
68         this.listenerTree = ListenerRegistrationNode.createRoot();
69     }
70
71     @Override
72     public String getIdentifier() {
73         return name;
74     }
75
76     @Override
77     public DOMStoreReadTransaction newReadOnlyTransaction() {
78         return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
79     }
80
81     @Override
82     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
83         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
84     }
85
86     @Override
87     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
88         return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
89     }
90
91     @Override
92     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
93         operationTree = SchemaAwareApplyOperationRoot.from(ctx);
94         schemaContext = ctx;
95     }
96
97     @Override
98     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
99             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
100
101         Optional<ListenerRegistrationNode> listenerNode = TreeNodeUtils.findNode(listenerTree, path);
102         checkState(listenerNode.isPresent());
103         synchronized (listener) {
104             notifyInitialState(path, listener);
105         }
106         return listenerNode.get().registerDataChangeListener(listener, scope);
107     }
108
109     private void notifyInitialState(final InstanceIdentifier path,
110             final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
111         Optional<StoreMetadataNode> currentState = snapshot.read(path);
112         try {
113             if (currentState.isPresent()) {
114                 NormalizedNode<?, ?> data = currentState.get().getData();
115                 listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
116                         .setAfter(data) //
117                         .addCreated(path, data) //
118                         .build() //
119                 );
120             }
121         } catch (Exception e) {
122             LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
123         }
124
125     }
126
127     private synchronized DOMStoreThreePhaseCommitCohort submit(
128             final SnaphostBackedWriteTransaction snaphostBackedWriteTransaction) {
129         return new ThreePhaseCommitImpl(snaphostBackedWriteTransaction);
130     }
131
132     private Object nextIdentifier() {
133         return name + "-" + txCounter.getAndIncrement();
134     }
135
136     private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
137             final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
138         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
139         checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
140         snapshot = DataAndMetadataSnapshot.builder() //
141                 .setMetadataTree(newDataTree) //
142                 .setSchemaContext(schemaContext) //
143                 .build();
144
145         for(ChangeListenerNotifyTask task : listenerTasks) {
146             executor.submit(task);
147         }
148
149     }
150
151     private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
152
153         private DataAndMetadataSnapshot stableSnapshot;
154         private final Object identifier;
155
156         public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
157             this.identifier = identifier;
158             this.stableSnapshot = snapshot;
159         }
160
161         @Override
162         public Object getIdentifier() {
163             return identifier;
164         }
165
166         @Override
167         public void close() {
168             stableSnapshot = null;
169         }
170
171         @Override
172         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
173             checkNotNull(path, "Path must not be null.");
174             checkState(stableSnapshot != null, "Transaction is closed");
175             return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
176         }
177
178         @Override
179         public String toString() {
180             return "SnapshotBackedReadTransaction [id =" + identifier + "]";
181         }
182
183     }
184
185     private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction {
186
187         private MutableDataTree mutableTree;
188         private final Object identifier;
189         private InMemoryDOMDataStore store;
190
191         private boolean ready = false;
192
193         public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
194                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
195             this.identifier = identifier;
196             mutableTree = MutableDataTree.from(snapshot, applyOper);
197             this.store = store;
198         }
199
200         @Override
201         public Object getIdentifier() {
202             return identifier;
203         }
204
205         @Override
206         public void close() {
207             this.mutableTree = null;
208             this.store = null;
209         }
210
211         @Override
212         public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
213             checkNotReady();
214             mutableTree.write(path, data);
215         }
216
217         @Override
218         public void delete(final InstanceIdentifier path) {
219             checkNotReady();
220             mutableTree.delete(path);
221         }
222
223         protected boolean isReady() {
224             return ready;
225         }
226
227         protected void checkNotReady() {
228             checkState(!ready, "Transaction is ready. No further modifications allowed.");
229         }
230
231         @Override
232         public synchronized DOMStoreThreePhaseCommitCohort ready() {
233             ready = true;
234             LOG.debug("Store transaction: {} : Ready", getIdentifier());
235             mutableTree.seal();
236             return store.submit(this);
237         }
238
239         protected MutableDataTree getMutatedView() {
240             return mutableTree;
241         }
242
243         @Override
244         public String toString() {
245             return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
246         }
247
248     }
249
250     private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
251             DOMStoreReadWriteTransaction {
252
253         protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
254                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
255             super(identifier, snapshot, store, applyOper);
256         }
257
258         @Override
259         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
260             return Futures.immediateFuture(getMutatedView().read(path));
261         }
262
263         @Override
264         public String toString() {
265             return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
266         }
267
268     }
269
270     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
271
272         private final SnaphostBackedWriteTransaction transaction;
273         private final NodeModification modification;
274
275         private DataAndMetadataSnapshot storeSnapshot;
276         private Optional<StoreMetadataNode> proposedSubtree;
277         private Iterable<ChangeListenerNotifyTask> listenerTasks;
278
279         public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
280             this.transaction = writeTransaction;
281             this.modification = transaction.getMutatedView().getRootModification();
282         }
283
284         @Override
285         public ListenableFuture<Boolean> canCommit() {
286             final DataAndMetadataSnapshot snapshotCapture = snapshot;
287             final ModificationApplyOperation snapshotOperation = operationTree;
288
289             return executor.submit(new Callable<Boolean>() {
290
291                 @Override
292                 public Boolean call() throws Exception {
293                     boolean applicable = snapshotOperation.isApplicable(modification,
294                             Optional.of(snapshotCapture.getMetadataTree()));
295                     LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
296                     return applicable;
297                 }
298             });
299         }
300
301         @Override
302         public ListenableFuture<Void> preCommit() {
303             storeSnapshot = snapshot;
304             return executor.submit(new Callable<Void>() {
305
306
307
308                 @Override
309                 public Void call() throws Exception {
310                     StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
311
312                     proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
313                             increase(metadataTree.getSubtreeVersion()));
314
315
316                     listenerTasks = DataChangeEventResolver.create() //
317                             .setRootPath(PUBLIC_ROOT_PATH) //
318                             .setBeforeRoot(Optional.of(metadataTree)) //
319                             .setAfterRoot(proposedSubtree) //
320                             .setModificationRoot(modification) //
321                             .setListenerRoot(listenerTree) //
322                             .resolve();
323
324                     return null;
325                 }
326             });
327         }
328
329         @Override
330         public ListenableFuture<Void> abort() {
331             storeSnapshot = null;
332             proposedSubtree = null;
333             return Futures.<Void> immediateFuture(null);
334         }
335
336         @Override
337         public ListenableFuture<Void> commit() {
338             checkState(proposedSubtree != null);
339             checkState(storeSnapshot != null);
340             // return ImmediateFuture<>;
341             InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
342             return Futures.<Void> immediateFuture(null);
343         }
344
345     }
346
347     private class AllwaysFailOperation implements ModificationApplyOperation {
348
349         @Override
350         public Optional<StoreMetadataNode> apply(final NodeModification modification,
351                 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
352             throw new IllegalStateException("Schema Context is not available.");
353         }
354
355         @Override
356         public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
357             throw new IllegalStateException("Schema Context is not available.");
358         }
359
360         @Override
361         public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
362             throw new IllegalStateException("Schema Context is not available.");
363         }
364
365         @Override
366         public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
367             throw new IllegalStateException("Schema Context is not available.");
368         }
369
370     }
371 }