427e7a00dbb443e1bc0f5de8553aa66ab5fc63c8
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
13
14 import java.util.Collections;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.atomic.AtomicLong;
17
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
20 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
30 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
31 import org.opendaylight.yangtools.concepts.Identifiable;
32 import org.opendaylight.yangtools.concepts.ListenerRegistration;
33 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import com.google.common.base.Objects;
42 import com.google.common.base.Objects.ToStringHelper;
43 import com.google.common.base.Optional;
44 import com.google.common.base.Preconditions;
45 import com.google.common.primitives.UnsignedLong;
46 import com.google.common.util.concurrent.Futures;
47 import com.google.common.util.concurrent.ListenableFuture;
48 import com.google.common.util.concurrent.ListeningExecutorService;
49
50 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
51
52     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
53     private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
54
55     private final ListeningExecutorService executor;
56     private final String name;
57     private final AtomicLong txCounter = new AtomicLong(0);
58     private final ListenerTree listenerTree = ListenerTree.create();
59     private final DataTree dataTree = DataTree.create(null);
60     private ModificationApplyOperation operationTree = new AlwaysFailOperation();
61
62     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
63         this.name = Preconditions.checkNotNull(name);
64         this.executor = Preconditions.checkNotNull(executor);
65     }
66
67     @Override
68     public final String getIdentifier() {
69         return name;
70     }
71
72     @Override
73     public DOMStoreReadTransaction newReadOnlyTransaction() {
74         return new SnapshotBackedReadTransaction(nextIdentifier(), dataTree.takeSnapshot());
75     }
76
77     @Override
78     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
79         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this, operationTree);
80     }
81
82     @Override
83     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
84         return new SnapshotBackedWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this, operationTree);
85     }
86
87     @Override
88     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
89         /*
90          * Order of operations is important: dataTree may reject the context
91          * and creation of ModificationApplyOperation may fail. So pre-construct
92          * the operation, then update the data tree and then move the operation
93          * into view.
94          */
95         final ModificationApplyOperation newOperationTree = SchemaAwareApplyOperationRoot.from(ctx);
96         dataTree.setSchemaContext(ctx);
97         operationTree = newOperationTree;
98     }
99
100     @Override
101     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
102             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
103
104         /*
105          * Make sure commit is not occurring right now. Listener has to be
106          * registered and its state capture enqueued at a consistent point.
107          *
108          * FIXME: improve this to read-write lock, such that multiple listener
109          * registrations can occur simultaneously
110          */
111         final DataChangeListenerRegistration<L> reg;
112         synchronized (this) {
113             LOG.debug("{}: Registering data change listener {} for {}", name, listener, path);
114
115             reg = listenerTree.registerDataChangeListener(path, listener, scope);
116
117             Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
118             if (currentState.isPresent()) {
119                 final NormalizedNode<?, ?> data = currentState.get();
120
121                 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) //
122                         .setAfter(data) //
123                         .addCreated(path, data) //
124                         .build();
125                 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
126             }
127         }
128
129         return new AbstractListenerRegistration<L>(listener) {
130             @Override
131             protected void removeRegistration() {
132                 synchronized (InMemoryDOMDataStore.this) {
133                     reg.close();
134                 }
135             }
136         };
137     }
138
139     private synchronized DOMStoreThreePhaseCommitCohort submit(final SnapshotBackedWriteTransaction writeTx) {
140         LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView());
141         return new ThreePhaseCommitImpl(writeTx);
142     }
143
144     private Object nextIdentifier() {
145         return name + "-" + txCounter.getAndIncrement();
146     }
147
148     private void commit(final DataTree.Snapshot currentSnapshot, final StoreMetadataNode newDataTree,
149             final ResolveDataChangeEventsTask listenerResolver) {
150         LOG.debug("Updating Store snaphot version: {} with version:{}", currentSnapshot, newDataTree.getSubtreeVersion());
151
152         if (LOG.isTraceEnabled()) {
153             LOG.trace("Data Tree is {}", StoreUtils.toStringTree(newDataTree.getData()));
154         }
155
156         /*
157          * The commit has to occur atomically with regard to listener
158          * registrations.
159          */
160         synchronized (this) {
161             dataTree.commitSnapshot(currentSnapshot, newDataTree);
162
163             for (ChangeListenerNotifyTask task : listenerResolver.call()) {
164                 LOG.trace("Scheduling invocation of listeners: {}", task);
165                 executor.submit(task);
166             }
167         }
168     }
169
170     private static abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
171         private final Object identifier;
172
173         protected AbstractDOMStoreTransaction(final Object identifier) {
174             this.identifier = identifier;
175         }
176
177         @Override
178         public final Object getIdentifier() {
179             return identifier;
180         }
181
182         @Override
183         public final String toString() {
184             return addToStringAttributes(Objects.toStringHelper(this)).toString();
185         }
186
187         /**
188          * Add class-specific toString attributes.
189          *
190          * @param toStringHelper
191          *            ToStringHelper instance
192          * @return ToStringHelper instance which was passed in
193          */
194         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
195             return toStringHelper.add("id", identifier);
196         }
197     }
198
199     private static final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements
200             DOMStoreReadTransaction {
201         private DataTree.Snapshot stableSnapshot;
202
203         public SnapshotBackedReadTransaction(final Object identifier, final DataTree.Snapshot snapshot) {
204             super(identifier);
205             this.stableSnapshot = Preconditions.checkNotNull(snapshot);
206             LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot);
207         }
208
209         @Override
210         public void close() {
211             LOG.debug("Store transaction: {} : Closed", getIdentifier());
212             stableSnapshot = null;
213         }
214
215         @Override
216         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
217             checkNotNull(path, "Path must not be null.");
218             checkState(stableSnapshot != null, "Transaction is closed");
219             return Futures.immediateFuture(stableSnapshot.readNode(path));
220         }
221     }
222
223     private static class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction implements
224             DOMStoreWriteTransaction {
225         private DataTreeModification mutableTree;
226         private InMemoryDOMDataStore store;
227         private boolean ready = false;
228
229         public SnapshotBackedWriteTransaction(final Object identifier, final DataTree.Snapshot snapshot,
230                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
231             super(identifier);
232             mutableTree = DataTreeModification.from(snapshot, applyOper);
233             this.store = store;
234             LOG.debug("Write Tx: {} allocated with snapshot {}", identifier, snapshot);
235         }
236
237         @Override
238         public void close() {
239             LOG.debug("Store transaction: {} : Closed", getIdentifier());
240             this.mutableTree = null;
241             this.store = null;
242         }
243
244         @Override
245         public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
246             checkNotReady();
247             try {
248                 LOG.trace("Tx: {} Write: {}:{}", getIdentifier(), path, data);
249                 mutableTree.write(path, data);
250                 // FIXME: Add checked exception
251             } catch (Exception e) {
252                 LOG.error("Tx: {}, failed to write {}:{} in {}", getIdentifier(), path, data, mutableTree, e);
253             }
254         }
255
256         @Override
257         public void merge(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
258             checkNotReady();
259             try {
260                 LOG.trace("Tx: {} Merge: {}:{}", getIdentifier(), path, data);
261                 mutableTree.merge(path, data);
262                 // FIXME: Add checked exception
263             } catch (Exception e) {
264                 LOG.error("Tx: {}, failed to write {}:{} in {}", getIdentifier(), path, data, mutableTree, e);
265             }
266         }
267
268         @Override
269         public void delete(final InstanceIdentifier path) {
270             checkNotReady();
271             try {
272                 LOG.trace("Tx: {} Delete: {}", getIdentifier(), path);
273                 mutableTree.delete(path);
274                 // FIXME: Add checked exception
275             } catch (Exception e) {
276                 LOG.error("Tx: {}, failed to delete {} in {}", getIdentifier(), path, mutableTree, e);
277             }
278         }
279
280         protected final boolean isReady() {
281             return ready;
282         }
283
284         protected final void checkNotReady() {
285             checkState(!ready, "Transaction %s is ready. No further modifications allowed.", getIdentifier());
286         }
287
288         @Override
289         public synchronized DOMStoreThreePhaseCommitCohort ready() {
290             checkState(!ready, "Transaction %s is already ready.", getIdentifier());
291             ready = true;
292
293             LOG.debug("Store transaction: {} : Ready", getIdentifier());
294             mutableTree.seal();
295             return store.submit(this);
296         }
297
298         protected DataTreeModification getMutatedView() {
299             return mutableTree;
300         }
301
302         @Override
303         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
304             return toStringHelper.add("ready", isReady());
305         }
306     }
307
308     private static class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements
309             DOMStoreReadWriteTransaction {
310
311         protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataTree.Snapshot snapshot,
312                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
313             super(identifier, snapshot, store, applyOper);
314         }
315
316         @Override
317         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
318             LOG.trace("Tx: {} Read: {}", getIdentifier(), path);
319             try {
320                 return Futures.immediateFuture(getMutatedView().read(path));
321             } catch (Exception e) {
322                 LOG.error("Tx: {} Failed Read of {}", getIdentifier(), path, e);
323                 throw e;
324             }
325         }
326     }
327
328     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
329
330         private final SnapshotBackedWriteTransaction transaction;
331         private final NodeModification modification;
332
333         private DataTree.Snapshot storeSnapshot;
334         private Optional<StoreMetadataNode> proposedSubtree;
335         private ResolveDataChangeEventsTask listenerResolver;
336
337         public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) {
338             this.transaction = writeTransaction;
339             this.modification = transaction.getMutatedView().getRootModification();
340         }
341
342         @Override
343         public ListenableFuture<Boolean> canCommit() {
344             final DataTree.Snapshot snapshotCapture = dataTree.takeSnapshot();
345             final ModificationApplyOperation snapshotOperation = operationTree;
346
347             return executor.submit(new Callable<Boolean>() {
348
349                 @Override
350                 public Boolean call() throws Exception {
351                     Boolean applicable = false;
352                     try {
353                         snapshotOperation.checkApplicable(PUBLIC_ROOT_PATH, modification,
354                             Optional.of(snapshotCapture.getRootNode()));
355                         applicable = true;
356                     } catch (DataPreconditionFailedException e) {
357                         LOG.warn("Store Tx: {} Data Precondition failed for {}.",transaction.getIdentifier(),e.getPath(),e);
358                         applicable = false;
359                     }
360                     LOG.debug("Store Transaction: {} : canCommit : {}", transaction.getIdentifier(), applicable);
361                     return applicable;
362                 }
363             });
364         }
365
366         @Override
367         public ListenableFuture<Void> preCommit() {
368             storeSnapshot = dataTree.takeSnapshot();
369             if (modification.getModificationType() == ModificationType.UNMODIFIED) {
370                 return Futures.immediateFuture(null);
371             }
372             return executor.submit(new Callable<Void>() {
373
374                 @Override
375                 public Void call() throws Exception {
376                     StoreMetadataNode metadataTree = storeSnapshot.getRootNode();
377
378                     proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
379                             increase(metadataTree.getSubtreeVersion()));
380
381                     listenerResolver = ResolveDataChangeEventsTask.create() //
382                             .setRootPath(PUBLIC_ROOT_PATH) //
383                             .setBeforeRoot(Optional.of(metadataTree)) //
384                             .setAfterRoot(proposedSubtree) //
385                             .setModificationRoot(modification) //
386                             .setListenerRoot(listenerTree);
387
388                     return null;
389                 }
390             });
391         }
392
393         @Override
394         public ListenableFuture<Void> abort() {
395             storeSnapshot = null;
396             proposedSubtree = null;
397             return Futures.<Void> immediateFuture(null);
398         }
399
400         @Override
401         public ListenableFuture<Void> commit() {
402             if (modification.getModificationType() == ModificationType.UNMODIFIED) {
403                 return Futures.immediateFuture(null);
404             }
405
406             checkState(proposedSubtree != null, "Proposed subtree must be computed");
407             checkState(storeSnapshot != null, "Proposed subtree must be computed");
408             // return ImmediateFuture<>;
409             InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(), listenerResolver);
410             return Futures.<Void> immediateFuture(null);
411         }
412
413     }
414
415     private static final class AlwaysFailOperation implements ModificationApplyOperation {
416
417         @Override
418         public Optional<StoreMetadataNode> apply(final NodeModification modification,
419                 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
420             throw new IllegalStateException("Schema Context is not available.");
421         }
422
423         @Override
424         public void checkApplicable(final InstanceIdentifier path,final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
425             throw new IllegalStateException("Schema Context is not available.");
426         }
427
428         @Override
429         public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
430             throw new IllegalStateException("Schema Context is not available.");
431         }
432
433         @Override
434         public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
435             throw new IllegalStateException("Schema Context is not available.");
436         }
437
438     }
439 }