BUG-509: internal cleanup
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
13
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.atomic.AtomicLong;
16
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
19 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
20 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
28 import org.opendaylight.yangtools.concepts.Identifiable;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import com.google.common.base.Optional;
40 import com.google.common.base.Preconditions;
41 import com.google.common.primitives.UnsignedLong;
42 import com.google.common.util.concurrent.Futures;
43 import com.google.common.util.concurrent.ListenableFuture;
44 import com.google.common.util.concurrent.ListeningExecutorService;
45
46 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
47
48     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
49     private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
50
51
52     private final ListeningExecutorService executor;
53     private final String name;
54     private final AtomicLong txCounter = new AtomicLong(0);
55
56     private DataAndMetadataSnapshot snapshot;
57     private ModificationApplyOperation operationTree;
58     private final ListenerRegistrationNode listenerTree;
59
60
61
62     private SchemaContext schemaContext;
63
64     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
65         this.name = Preconditions.checkNotNull(name);
66         this.executor = Preconditions.checkNotNull(executor);
67         this.operationTree = new AlwaysFailOperation();
68         this.snapshot = DataAndMetadataSnapshot.createEmpty();
69         this.listenerTree = ListenerRegistrationNode.createRoot();
70     }
71
72     @Override
73     public final String getIdentifier() {
74         return name;
75     }
76
77     @Override
78     public DOMStoreReadTransaction newReadOnlyTransaction() {
79         return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
80     }
81
82     @Override
83     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
84         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
85     }
86
87     @Override
88     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
89         return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
90     }
91
92     @Override
93     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
94         operationTree = SchemaAwareApplyOperationRoot.from(ctx);
95         schemaContext = ctx;
96     }
97
98     @Override
99     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
100             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
101         LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
102         ListenerRegistrationNode listenerNode = listenerTree;
103         for(PathArgument arg :path.getPath()) {
104             listenerNode = listenerNode.ensureChild(arg);
105         }
106         synchronized (listener) {
107             notifyInitialState(path, listener);
108         }
109         return listenerNode.registerDataChangeListener(path,listener, scope);
110     }
111
112     private void notifyInitialState(final InstanceIdentifier path,
113             final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
114         Optional<StoreMetadataNode> currentState = snapshot.read(path);
115         try {
116             if (currentState.isPresent()) {
117                 NormalizedNode<?, ?> data = currentState.get().getData();
118                 listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
119                         .setAfter(data) //
120                         .addCreated(path, data) //
121                         .build() //
122                 );
123             }
124         } catch (Exception e) {
125             LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
126         }
127
128     }
129
130     private synchronized DOMStoreThreePhaseCommitCohort submit(
131             final SnaphostBackedWriteTransaction writeTx) {
132         LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
133         return new ThreePhaseCommitImpl(writeTx);
134     }
135
136     private Object nextIdentifier() {
137         return name + "-" + txCounter.getAndIncrement();
138     }
139
140     private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
141             final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
142         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
143
144         if(LOG.isTraceEnabled()) {
145             LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
146         }
147         checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
148         snapshot = DataAndMetadataSnapshot.builder() //
149                 .setMetadataTree(newDataTree) //
150                 .setSchemaContext(schemaContext) //
151                 .build();
152
153         for(ChangeListenerNotifyTask task : listenerTasks) {
154             executor.submit(task);
155         }
156
157     }
158
159     private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
160
161         private DataAndMetadataSnapshot stableSnapshot;
162         private final Object identifier;
163
164         public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
165             this.identifier = identifier;
166             this.stableSnapshot = snapshot;
167             LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
168
169         }
170
171         @Override
172         public Object getIdentifier() {
173             return identifier;
174         }
175
176         @Override
177         public void close() {
178             stableSnapshot = null;
179         }
180
181         @Override
182         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
183             checkNotNull(path, "Path must not be null.");
184             checkState(stableSnapshot != null, "Transaction is closed");
185             return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
186         }
187
188         @Override
189         public String toString() {
190             return "SnapshotBackedReadTransaction [id =" + identifier + "]";
191         }
192
193     }
194
195     private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction {
196
197         private MutableDataTree mutableTree;
198         private final Object identifier;
199         private InMemoryDOMDataStore store;
200
201         private boolean ready = false;
202
203         public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
204                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
205             this.identifier = identifier;
206             mutableTree = MutableDataTree.from(snapshot, applyOper);
207             this.store = store;
208             LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
209         }
210
211         @Override
212         public Object getIdentifier() {
213             return identifier;
214         }
215
216         @Override
217         public void close() {
218             this.mutableTree = null;
219             this.store = null;
220         }
221
222         @Override
223         public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
224             checkNotReady();
225             mutableTree.write(path, data);
226         }
227
228         @Override
229         public void delete(final InstanceIdentifier path) {
230             checkNotReady();
231             mutableTree.delete(path);
232         }
233
234         protected boolean isReady() {
235             return ready;
236         }
237
238         protected void checkNotReady() {
239             checkState(!ready, "Transaction is ready. No further modifications allowed.");
240         }
241
242         @Override
243         public synchronized DOMStoreThreePhaseCommitCohort ready() {
244             ready = true;
245             LOG.debug("Store transaction: {} : Ready", getIdentifier());
246             mutableTree.seal();
247             return store.submit(this);
248         }
249
250         protected MutableDataTree getMutatedView() {
251             return mutableTree;
252         }
253
254         @Override
255         public String toString() {
256             return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
257         }
258
259     }
260
261     private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
262             DOMStoreReadWriteTransaction {
263
264         protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
265                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
266             super(identifier, snapshot, store, applyOper);
267         }
268
269         @Override
270         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
271             return Futures.immediateFuture(getMutatedView().read(path));
272         }
273
274         @Override
275         public String toString() {
276             return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
277         }
278
279     }
280
281     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
282
283         private final SnaphostBackedWriteTransaction transaction;
284         private final NodeModification modification;
285
286         private DataAndMetadataSnapshot storeSnapshot;
287         private Optional<StoreMetadataNode> proposedSubtree;
288         private Iterable<ChangeListenerNotifyTask> listenerTasks;
289
290         public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
291             this.transaction = writeTransaction;
292             this.modification = transaction.getMutatedView().getRootModification();
293         }
294
295         @Override
296         public ListenableFuture<Boolean> canCommit() {
297             final DataAndMetadataSnapshot snapshotCapture = snapshot;
298             final ModificationApplyOperation snapshotOperation = operationTree;
299
300             return executor.submit(new Callable<Boolean>() {
301
302                 @Override
303                 public Boolean call() throws Exception {
304                     boolean applicable = snapshotOperation.isApplicable(modification,
305                             Optional.of(snapshotCapture.getMetadataTree()));
306                     LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
307                     return applicable;
308                 }
309             });
310         }
311
312         @Override
313         public ListenableFuture<Void> preCommit() {
314             storeSnapshot = snapshot;
315             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
316                 return Futures.immediateFuture(null);
317             }
318             return executor.submit(new Callable<Void>() {
319
320
321
322                 @Override
323                 public Void call() throws Exception {
324                     StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
325
326                     proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
327                             increase(metadataTree.getSubtreeVersion()));
328
329                     listenerTasks = DataChangeEventResolver.create() //
330                             .setRootPath(PUBLIC_ROOT_PATH) //
331                             .setBeforeRoot(Optional.of(metadataTree)) //
332                             .setAfterRoot(proposedSubtree) //
333                             .setModificationRoot(modification) //
334                             .setListenerRoot(listenerTree) //
335                             .resolve();
336
337                     return null;
338                 }
339             });
340         }
341
342         @Override
343         public ListenableFuture<Void> abort() {
344             storeSnapshot = null;
345             proposedSubtree = null;
346             return Futures.<Void> immediateFuture(null);
347         }
348
349         @Override
350         public ListenableFuture<Void> commit() {
351             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
352                 return Futures.immediateFuture(null);
353             }
354
355             checkState(proposedSubtree != null,"Proposed subtree must be computed");
356             checkState(storeSnapshot != null,"Proposed subtree must be computed");
357             // return ImmediateFuture<>;
358             InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
359             return Futures.<Void> immediateFuture(null);
360         }
361
362     }
363
364     private static final class AlwaysFailOperation implements ModificationApplyOperation {
365
366         @Override
367         public Optional<StoreMetadataNode> apply(final NodeModification modification,
368                 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
369             throw new IllegalStateException("Schema Context is not available.");
370         }
371
372         @Override
373         public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
374             throw new IllegalStateException("Schema Context is not available.");
375         }
376
377         @Override
378         public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
379             throw new IllegalStateException("Schema Context is not available.");
380         }
381
382         @Override
383         public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
384             throw new IllegalStateException("Schema Context is not available.");
385         }
386
387     }
388 }