0cd00cad7e6ec9609a03bac044f1ac613edf32c2
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
13
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.atomic.AtomicLong;
16
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
19 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
20 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
28 import org.opendaylight.yangtools.concepts.Identifiable;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import com.google.common.base.Optional;
40 import com.google.common.primitives.UnsignedLong;
41 import com.google.common.util.concurrent.Futures;
42 import com.google.common.util.concurrent.ListenableFuture;
43 import com.google.common.util.concurrent.ListeningExecutorService;
44
45 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
46
47     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
48     private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
49
50
51     private final ListeningExecutorService executor;
52     private final String name;
53     private final AtomicLong txCounter = new AtomicLong(0);
54
55     private DataAndMetadataSnapshot snapshot;
56     private ModificationApplyOperation operationTree;
57     private final ListenerRegistrationNode listenerTree;
58
59
60
61     private SchemaContext schemaContext;
62
63     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
64         this.executor = executor;
65         this.name = name;
66         this.operationTree = new AllwaysFailOperation();
67         this.snapshot = DataAndMetadataSnapshot.createEmpty();
68         this.listenerTree = ListenerRegistrationNode.createRoot();
69     }
70
71     @Override
72     public String getIdentifier() {
73         return name;
74     }
75
76     @Override
77     public DOMStoreReadTransaction newReadOnlyTransaction() {
78         return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
79     }
80
81     @Override
82     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
83         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
84     }
85
86     @Override
87     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
88         return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
89     }
90
91     @Override
92     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
93         operationTree = SchemaAwareApplyOperationRoot.from(ctx);
94         schemaContext = ctx;
95     }
96
97     @Override
98     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
99             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
100         LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
101         ListenerRegistrationNode listenerNode = listenerTree;
102         for(PathArgument arg :path.getPath()) {
103             listenerNode = listenerNode.ensureChild(arg);
104         }
105         synchronized (listener) {
106             notifyInitialState(path, listener);
107         }
108         return listenerNode.registerDataChangeListener(path,listener, scope);
109     }
110
111     private void notifyInitialState(final InstanceIdentifier path,
112             final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
113         Optional<StoreMetadataNode> currentState = snapshot.read(path);
114         try {
115             if (currentState.isPresent()) {
116                 NormalizedNode<?, ?> data = currentState.get().getData();
117                 listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
118                         .setAfter(data) //
119                         .addCreated(path, data) //
120                         .build() //
121                 );
122             }
123         } catch (Exception e) {
124             LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
125         }
126
127     }
128
129     private synchronized DOMStoreThreePhaseCommitCohort submit(
130             final SnaphostBackedWriteTransaction writeTx) {
131         LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
132         return new ThreePhaseCommitImpl(writeTx);
133     }
134
135     private Object nextIdentifier() {
136         return name + "-" + txCounter.getAndIncrement();
137     }
138
139     private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
140             final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
141         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
142
143         if(LOG.isTraceEnabled()) {
144             LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
145         }
146         checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
147         snapshot = DataAndMetadataSnapshot.builder() //
148                 .setMetadataTree(newDataTree) //
149                 .setSchemaContext(schemaContext) //
150                 .build();
151
152         for(ChangeListenerNotifyTask task : listenerTasks) {
153             executor.submit(task);
154         }
155
156     }
157
158     private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
159
160         private DataAndMetadataSnapshot stableSnapshot;
161         private final Object identifier;
162
163         public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
164             this.identifier = identifier;
165             this.stableSnapshot = snapshot;
166             LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
167
168         }
169
170         @Override
171         public Object getIdentifier() {
172             return identifier;
173         }
174
175         @Override
176         public void close() {
177             stableSnapshot = null;
178         }
179
180         @Override
181         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
182             checkNotNull(path, "Path must not be null.");
183             checkState(stableSnapshot != null, "Transaction is closed");
184             return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
185         }
186
187         @Override
188         public String toString() {
189             return "SnapshotBackedReadTransaction [id =" + identifier + "]";
190         }
191
192     }
193
194     private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction {
195
196         private MutableDataTree mutableTree;
197         private final Object identifier;
198         private InMemoryDOMDataStore store;
199
200         private boolean ready = false;
201
202         public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
203                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
204             this.identifier = identifier;
205             mutableTree = MutableDataTree.from(snapshot, applyOper);
206             this.store = store;
207             LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
208         }
209
210         @Override
211         public Object getIdentifier() {
212             return identifier;
213         }
214
215         @Override
216         public void close() {
217             this.mutableTree = null;
218             this.store = null;
219         }
220
221         @Override
222         public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
223             checkNotReady();
224             mutableTree.write(path, data);
225         }
226
227         @Override
228         public void delete(final InstanceIdentifier path) {
229             checkNotReady();
230             mutableTree.delete(path);
231         }
232
233         protected boolean isReady() {
234             return ready;
235         }
236
237         protected void checkNotReady() {
238             checkState(!ready, "Transaction is ready. No further modifications allowed.");
239         }
240
241         @Override
242         public synchronized DOMStoreThreePhaseCommitCohort ready() {
243             ready = true;
244             LOG.debug("Store transaction: {} : Ready", getIdentifier());
245             mutableTree.seal();
246             return store.submit(this);
247         }
248
249         protected MutableDataTree getMutatedView() {
250             return mutableTree;
251         }
252
253         @Override
254         public String toString() {
255             return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
256         }
257
258     }
259
260     private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
261             DOMStoreReadWriteTransaction {
262
263         protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
264                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
265             super(identifier, snapshot, store, applyOper);
266         }
267
268         @Override
269         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
270             return Futures.immediateFuture(getMutatedView().read(path));
271         }
272
273         @Override
274         public String toString() {
275             return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
276         }
277
278     }
279
280     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
281
282         private final SnaphostBackedWriteTransaction transaction;
283         private final NodeModification modification;
284
285         private DataAndMetadataSnapshot storeSnapshot;
286         private Optional<StoreMetadataNode> proposedSubtree;
287         private Iterable<ChangeListenerNotifyTask> listenerTasks;
288
289         public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
290             this.transaction = writeTransaction;
291             this.modification = transaction.getMutatedView().getRootModification();
292         }
293
294         @Override
295         public ListenableFuture<Boolean> canCommit() {
296             final DataAndMetadataSnapshot snapshotCapture = snapshot;
297             final ModificationApplyOperation snapshotOperation = operationTree;
298
299             return executor.submit(new Callable<Boolean>() {
300
301                 @Override
302                 public Boolean call() throws Exception {
303                     boolean applicable = snapshotOperation.isApplicable(modification,
304                             Optional.of(snapshotCapture.getMetadataTree()));
305                     LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
306                     return applicable;
307                 }
308             });
309         }
310
311         @Override
312         public ListenableFuture<Void> preCommit() {
313             storeSnapshot = snapshot;
314             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
315                 return Futures.immediateFuture(null);
316             }
317             return executor.submit(new Callable<Void>() {
318
319
320
321                 @Override
322                 public Void call() throws Exception {
323                     StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
324
325                     proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
326                             increase(metadataTree.getSubtreeVersion()));
327
328                     listenerTasks = DataChangeEventResolver.create() //
329                             .setRootPath(PUBLIC_ROOT_PATH) //
330                             .setBeforeRoot(Optional.of(metadataTree)) //
331                             .setAfterRoot(proposedSubtree) //
332                             .setModificationRoot(modification) //
333                             .setListenerRoot(listenerTree) //
334                             .resolve();
335
336                     return null;
337                 }
338             });
339         }
340
341         @Override
342         public ListenableFuture<Void> abort() {
343             storeSnapshot = null;
344             proposedSubtree = null;
345             return Futures.<Void> immediateFuture(null);
346         }
347
348         @Override
349         public ListenableFuture<Void> commit() {
350             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
351                 return Futures.immediateFuture(null);
352             }
353
354             checkState(proposedSubtree != null,"Proposed subtree must be computed");
355             checkState(storeSnapshot != null,"Proposed subtree must be computed");
356             // return ImmediateFuture<>;
357             InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
358             return Futures.<Void> immediateFuture(null);
359         }
360
361     }
362
363     private class AllwaysFailOperation implements ModificationApplyOperation {
364
365         @Override
366         public Optional<StoreMetadataNode> apply(final NodeModification modification,
367                 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
368             throw new IllegalStateException("Schema Context is not available.");
369         }
370
371         @Override
372         public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
373             throw new IllegalStateException("Schema Context is not available.");
374         }
375
376         @Override
377         public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
378             throw new IllegalStateException("Schema Context is not available.");
379         }
380
381         @Override
382         public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
383             throw new IllegalStateException("Schema Context is not available.");
384         }
385
386     }
387 }