BUG-509: Improve InMemoryDOMDataStore structure
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
13
14 import java.util.Collections;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.atomic.AtomicLong;
17 import java.util.concurrent.atomic.AtomicReference;
18
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
24 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
25 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import com.google.common.base.Objects;
44 import com.google.common.base.Objects.ToStringHelper;
45 import com.google.common.base.Optional;
46 import com.google.common.base.Preconditions;
47 import com.google.common.primitives.UnsignedLong;
48 import com.google.common.util.concurrent.Futures;
49 import com.google.common.util.concurrent.ListenableFuture;
50 import com.google.common.util.concurrent.ListeningExecutorService;
51
52 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
53
54     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
55     private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
56
57
58     private final ListeningExecutorService executor;
59     private final String name;
60     private final AtomicLong txCounter = new AtomicLong(0);
61     private final ListenerRegistrationNode listenerTree;
62     private final AtomicReference<DataAndMetadataSnapshot> snapshot;
63
64     private ModificationApplyOperation operationTree;
65
66     private SchemaContext schemaContext;
67
68     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
69         this.name = Preconditions.checkNotNull(name);
70         this.executor = Preconditions.checkNotNull(executor);
71         this.listenerTree = ListenerRegistrationNode.createRoot();
72         this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
73         this.operationTree = new AlwaysFailOperation();
74     }
75
76     @Override
77     public final String getIdentifier() {
78         return name;
79     }
80
81     @Override
82     public DOMStoreReadTransaction newReadOnlyTransaction() {
83         return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
84     }
85
86     @Override
87     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
88         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
89     }
90
91     @Override
92     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
93         return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
94     }
95
96     @Override
97     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
98         operationTree = SchemaAwareApplyOperationRoot.from(ctx);
99         schemaContext = ctx;
100     }
101
102     @Override
103     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
104             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
105         LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
106         ListenerRegistrationNode listenerNode = listenerTree;
107         for(PathArgument arg : path.getPath()) {
108             listenerNode = listenerNode.ensureChild(arg);
109         }
110
111         /*
112          * Make sure commit is not occurring right now. Listener has to be registered and its
113          * state capture enqueued at a consistent point.
114          *
115          * FIXME: improve this to read-write lock, such that multiple listener registrations
116          *        can occur simultaneously
117          */
118         final DataChangeListenerRegistration<L> reg;
119         synchronized (this) {
120             reg = listenerNode.registerDataChangeListener(path, listener, scope);
121
122             Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
123             if (currentState.isPresent()) {
124                 final NormalizedNode<?, ?> data = currentState.get().getData();
125
126                 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
127                         .setAfter(data) //
128                         .addCreated(path, data) //
129                         .build();
130                 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
131             }
132         }
133
134         return reg;
135     }
136
137     private synchronized DOMStoreThreePhaseCommitCohort submit(
138             final SnaphostBackedWriteTransaction writeTx) {
139         LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
140         return new ThreePhaseCommitImpl(writeTx);
141     }
142
143     private Object nextIdentifier() {
144         return name + "-" + txCounter.getAndIncrement();
145     }
146
147     private void commit(final DataAndMetadataSnapshot currentSnapshot,
148             final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
149         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
150
151         if(LOG.isTraceEnabled()) {
152             LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
153         }
154
155         final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
156                 .setMetadataTree(newDataTree) //
157                 .setSchemaContext(schemaContext) //
158                 .build();
159
160         /*
161          * The commit has to occur atomically with regard to listener registrations.
162          */
163         synchronized (this) {
164             final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
165             checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
166
167             for (ChangeListenerNotifyTask task : listenerTasks) {
168                 executor.submit(task);
169             }
170         }
171     }
172
173     private static abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
174         private final Object identifier;
175
176         protected AbstractDOMStoreTransaction(final Object identifier) {
177             this.identifier = identifier;
178         }
179
180         @Override
181         public final Object getIdentifier() {
182             return identifier;
183         }
184
185         @Override
186         public final String toString() {
187             return addToStringAttributes(Objects.toStringHelper(this)).toString();
188         }
189
190         /**
191          * Add class-specific toString attributes.
192          *
193          * @param toStringHelper ToStringHelper instance
194          * @return ToStringHelper instance which was passed in
195          */
196         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
197             return toStringHelper.add("id", identifier);
198         }
199     }
200
201     private static class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements DOMStoreReadTransaction {
202         private DataAndMetadataSnapshot stableSnapshot;
203
204         public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
205             super(identifier);
206             this.stableSnapshot = Preconditions.checkNotNull(snapshot);
207             LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot.getMetadataTree().getSubtreeVersion());
208         }
209
210         @Override
211         public void close() {
212             LOG.debug("Store transaction: {} : Closed", getIdentifier());
213             stableSnapshot = null;
214         }
215
216         @Override
217         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
218             checkNotNull(path, "Path must not be null.");
219             checkState(stableSnapshot != null, "Transaction is closed");
220             return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
221         }
222     }
223
224     private static class SnaphostBackedWriteTransaction extends AbstractDOMStoreTransaction implements DOMStoreWriteTransaction {
225         private MutableDataTree mutableTree;
226         private InMemoryDOMDataStore store;
227         private boolean ready = false;
228
229         public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
230                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
231             super(identifier);
232             mutableTree = MutableDataTree.from(snapshot, applyOper);
233             this.store = store;
234             LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
235         }
236
237         @Override
238         public void close() {
239             LOG.debug("Store transaction: {} : Closed", getIdentifier());
240             this.mutableTree = null;
241             this.store = null;
242         }
243
244         @Override
245         public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
246             checkNotReady();
247             mutableTree.write(path, data);
248         }
249
250         @Override
251         public void delete(final InstanceIdentifier path) {
252             checkNotReady();
253             mutableTree.delete(path);
254         }
255
256         protected final boolean isReady() {
257             return ready;
258         }
259
260         protected final void checkNotReady() {
261             checkState(!ready, "Transaction %s is ready. No further modifications allowed.", getIdentifier());
262         }
263
264         @Override
265         public synchronized DOMStoreThreePhaseCommitCohort ready() {
266             checkState(!ready, "Transaction %s is already ready.", getIdentifier());
267             ready = true;
268
269             LOG.debug("Store transaction: {} : Ready", getIdentifier());
270             mutableTree.seal();
271             return store.submit(this);
272         }
273
274         protected MutableDataTree getMutatedView() {
275             return mutableTree;
276         }
277
278         @Override
279         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
280             return toStringHelper.add("ready", isReady());
281         }
282     }
283
284     private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
285             DOMStoreReadWriteTransaction {
286
287         protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
288                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
289             super(identifier, snapshot, store, applyOper);
290         }
291
292         @Override
293         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
294             return Futures.immediateFuture(getMutatedView().read(path));
295         }
296     }
297
298     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
299
300         private final SnaphostBackedWriteTransaction transaction;
301         private final NodeModification modification;
302
303         private DataAndMetadataSnapshot storeSnapshot;
304         private Optional<StoreMetadataNode> proposedSubtree;
305         private Iterable<ChangeListenerNotifyTask> listenerTasks;
306
307         public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
308             this.transaction = writeTransaction;
309             this.modification = transaction.getMutatedView().getRootModification();
310         }
311
312         @Override
313         public ListenableFuture<Boolean> canCommit() {
314             final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
315             final ModificationApplyOperation snapshotOperation = operationTree;
316
317             return executor.submit(new Callable<Boolean>() {
318
319                 @Override
320                 public Boolean call() throws Exception {
321                     boolean applicable = snapshotOperation.isApplicable(modification,
322                             Optional.of(snapshotCapture.getMetadataTree()));
323                     LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
324                     return applicable;
325                 }
326             });
327         }
328
329         @Override
330         public ListenableFuture<Void> preCommit() {
331             storeSnapshot = snapshot.get();
332             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
333                 return Futures.immediateFuture(null);
334             }
335             return executor.submit(new Callable<Void>() {
336
337
338
339                 @Override
340                 public Void call() throws Exception {
341                     StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
342
343                     proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
344                             increase(metadataTree.getSubtreeVersion()));
345
346                     listenerTasks = DataChangeEventResolver.create() //
347                             .setRootPath(PUBLIC_ROOT_PATH) //
348                             .setBeforeRoot(Optional.of(metadataTree)) //
349                             .setAfterRoot(proposedSubtree) //
350                             .setModificationRoot(modification) //
351                             .setListenerRoot(listenerTree) //
352                             .resolve();
353
354                     return null;
355                 }
356             });
357         }
358
359         @Override
360         public ListenableFuture<Void> abort() {
361             storeSnapshot = null;
362             proposedSubtree = null;
363             return Futures.<Void> immediateFuture(null);
364         }
365
366         @Override
367         public ListenableFuture<Void> commit() {
368             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
369                 return Futures.immediateFuture(null);
370             }
371
372             checkState(proposedSubtree != null,"Proposed subtree must be computed");
373             checkState(storeSnapshot != null,"Proposed subtree must be computed");
374             // return ImmediateFuture<>;
375             InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
376             return Futures.<Void> immediateFuture(null);
377         }
378
379     }
380
381     private static final class AlwaysFailOperation implements ModificationApplyOperation {
382
383         @Override
384         public Optional<StoreMetadataNode> apply(final NodeModification modification,
385                 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
386             throw new IllegalStateException("Schema Context is not available.");
387         }
388
389         @Override
390         public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
391             throw new IllegalStateException("Schema Context is not available.");
392         }
393
394         @Override
395         public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
396             throw new IllegalStateException("Schema Context is not available.");
397         }
398
399         @Override
400         public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
401             throw new IllegalStateException("Schema Context is not available.");
402         }
403
404     }
405 }