Merge "Bug 639, Bug 641, Bug 642: This is MD-SAL based sample implementation of a...
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
13
14 import java.util.Collections;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.atomic.AtomicLong;
17 import java.util.concurrent.atomic.AtomicReference;
18
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
24 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
25 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
31 import org.opendaylight.yangtools.concepts.Identifiable;
32 import org.opendaylight.yangtools.concepts.ListenerRegistration;
33 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import com.google.common.base.Optional;
43 import com.google.common.base.Preconditions;
44 import com.google.common.primitives.UnsignedLong;
45 import com.google.common.util.concurrent.Futures;
46 import com.google.common.util.concurrent.ListenableFuture;
47 import com.google.common.util.concurrent.ListeningExecutorService;
48
49 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
50
51     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
52     private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
53
54
55     private final ListeningExecutorService executor;
56     private final String name;
57     private final AtomicLong txCounter = new AtomicLong(0);
58     private final ListenerRegistrationNode listenerTree;
59     private final AtomicReference<DataAndMetadataSnapshot> snapshot;
60
61     private ModificationApplyOperation operationTree;
62
63     private SchemaContext schemaContext;
64
65     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
66         this.name = Preconditions.checkNotNull(name);
67         this.executor = Preconditions.checkNotNull(executor);
68         this.listenerTree = ListenerRegistrationNode.createRoot();
69         this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
70         this.operationTree = new AlwaysFailOperation();
71     }
72
73     @Override
74     public final String getIdentifier() {
75         return name;
76     }
77
78     @Override
79     public DOMStoreReadTransaction newReadOnlyTransaction() {
80         return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
81     }
82
83     @Override
84     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
85         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
86     }
87
88     @Override
89     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
90         return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
91     }
92
93     @Override
94     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
95         operationTree = SchemaAwareApplyOperationRoot.from(ctx);
96         schemaContext = ctx;
97     }
98
99     @Override
100     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
101             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
102         LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
103         ListenerRegistrationNode listenerNode = listenerTree;
104         for(PathArgument arg : path.getPath()) {
105             listenerNode = listenerNode.ensureChild(arg);
106         }
107
108         /*
109          * Make sure commit is not occurring right now. Listener has to be registered and its
110          * state capture enqueued at a consistent point.
111          *
112          * FIXME: improve this to read-write lock, such that multiple listener registrations
113          *        can occur simultaneously
114          */
115         final DataChangeListenerRegistration<L> reg;
116         synchronized (this) {
117             reg = listenerNode.registerDataChangeListener(path, listener, scope);
118
119             Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
120             if (currentState.isPresent()) {
121                 final NormalizedNode<?, ?> data = currentState.get().getData();
122
123                 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
124                         .setAfter(data) //
125                         .addCreated(path, data) //
126                         .build();
127                 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
128             }
129         }
130
131         return reg;
132     }
133
134     private synchronized DOMStoreThreePhaseCommitCohort submit(
135             final SnaphostBackedWriteTransaction writeTx) {
136         LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
137         return new ThreePhaseCommitImpl(writeTx);
138     }
139
140     private Object nextIdentifier() {
141         return name + "-" + txCounter.getAndIncrement();
142     }
143
144     private void commit(final DataAndMetadataSnapshot currentSnapshot,
145             final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
146         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
147
148         if(LOG.isTraceEnabled()) {
149             LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
150         }
151
152         final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
153                 .setMetadataTree(newDataTree) //
154                 .setSchemaContext(schemaContext) //
155                 .build();
156
157         /*
158          * The commit has to occur atomically with regard to listener registrations.
159          */
160         synchronized (this) {
161             final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
162             checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
163
164             for (ChangeListenerNotifyTask task : listenerTasks) {
165                 executor.submit(task);
166             }
167         }
168     }
169
170     private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
171
172         private DataAndMetadataSnapshot stableSnapshot;
173         private final Object identifier;
174
175         public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
176             this.identifier = identifier;
177             this.stableSnapshot = snapshot;
178             LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
179
180         }
181
182         @Override
183         public Object getIdentifier() {
184             return identifier;
185         }
186
187         @Override
188         public void close() {
189             stableSnapshot = null;
190         }
191
192         @Override
193         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
194             checkNotNull(path, "Path must not be null.");
195             checkState(stableSnapshot != null, "Transaction is closed");
196             return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
197         }
198
199         @Override
200         public String toString() {
201             return "SnapshotBackedReadTransaction [id =" + identifier + "]";
202         }
203
204     }
205
206     private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction {
207
208         private MutableDataTree mutableTree;
209         private final Object identifier;
210         private InMemoryDOMDataStore store;
211
212         private boolean ready = false;
213
214         public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
215                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
216             this.identifier = identifier;
217             mutableTree = MutableDataTree.from(snapshot, applyOper);
218             this.store = store;
219             LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
220         }
221
222         @Override
223         public Object getIdentifier() {
224             return identifier;
225         }
226
227         @Override
228         public void close() {
229             this.mutableTree = null;
230             this.store = null;
231         }
232
233         @Override
234         public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
235             checkNotReady();
236             mutableTree.write(path, data);
237         }
238
239         @Override
240         public void delete(final InstanceIdentifier path) {
241             checkNotReady();
242             mutableTree.delete(path);
243         }
244
245         protected boolean isReady() {
246             return ready;
247         }
248
249         protected void checkNotReady() {
250             checkState(!ready, "Transaction is ready. No further modifications allowed.");
251         }
252
253         @Override
254         public synchronized DOMStoreThreePhaseCommitCohort ready() {
255             ready = true;
256             LOG.debug("Store transaction: {} : Ready", getIdentifier());
257             mutableTree.seal();
258             return store.submit(this);
259         }
260
261         protected MutableDataTree getMutatedView() {
262             return mutableTree;
263         }
264
265         @Override
266         public String toString() {
267             return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
268         }
269
270     }
271
272     private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
273             DOMStoreReadWriteTransaction {
274
275         protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
276                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
277             super(identifier, snapshot, store, applyOper);
278         }
279
280         @Override
281         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
282             return Futures.immediateFuture(getMutatedView().read(path));
283         }
284
285         @Override
286         public String toString() {
287             return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
288         }
289
290     }
291
292     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
293
294         private final SnaphostBackedWriteTransaction transaction;
295         private final NodeModification modification;
296
297         private DataAndMetadataSnapshot storeSnapshot;
298         private Optional<StoreMetadataNode> proposedSubtree;
299         private Iterable<ChangeListenerNotifyTask> listenerTasks;
300
301         public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
302             this.transaction = writeTransaction;
303             this.modification = transaction.getMutatedView().getRootModification();
304         }
305
306         @Override
307         public ListenableFuture<Boolean> canCommit() {
308             final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
309             final ModificationApplyOperation snapshotOperation = operationTree;
310
311             return executor.submit(new Callable<Boolean>() {
312
313                 @Override
314                 public Boolean call() throws Exception {
315                     boolean applicable = snapshotOperation.isApplicable(modification,
316                             Optional.of(snapshotCapture.getMetadataTree()));
317                     LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
318                     return applicable;
319                 }
320             });
321         }
322
323         @Override
324         public ListenableFuture<Void> preCommit() {
325             storeSnapshot = snapshot.get();
326             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
327                 return Futures.immediateFuture(null);
328             }
329             return executor.submit(new Callable<Void>() {
330
331
332
333                 @Override
334                 public Void call() throws Exception {
335                     StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
336
337                     proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
338                             increase(metadataTree.getSubtreeVersion()));
339
340                     listenerTasks = DataChangeEventResolver.create() //
341                             .setRootPath(PUBLIC_ROOT_PATH) //
342                             .setBeforeRoot(Optional.of(metadataTree)) //
343                             .setAfterRoot(proposedSubtree) //
344                             .setModificationRoot(modification) //
345                             .setListenerRoot(listenerTree) //
346                             .resolve();
347
348                     return null;
349                 }
350             });
351         }
352
353         @Override
354         public ListenableFuture<Void> abort() {
355             storeSnapshot = null;
356             proposedSubtree = null;
357             return Futures.<Void> immediateFuture(null);
358         }
359
360         @Override
361         public ListenableFuture<Void> commit() {
362             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
363                 return Futures.immediateFuture(null);
364             }
365
366             checkState(proposedSubtree != null,"Proposed subtree must be computed");
367             checkState(storeSnapshot != null,"Proposed subtree must be computed");
368             // return ImmediateFuture<>;
369             InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
370             return Futures.<Void> immediateFuture(null);
371         }
372
373     }
374
375     private static final class AlwaysFailOperation implements ModificationApplyOperation {
376
377         @Override
378         public Optional<StoreMetadataNode> apply(final NodeModification modification,
379                 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
380             throw new IllegalStateException("Schema Context is not available.");
381         }
382
383         @Override
384         public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
385             throw new IllegalStateException("Schema Context is not available.");
386         }
387
388         @Override
389         public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
390             throw new IllegalStateException("Schema Context is not available.");
391         }
392
393         @Override
394         public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
395             throw new IllegalStateException("Schema Context is not available.");
396         }
397
398     }
399 }