Bug 849: Fixed NPE in Translated Data Change Events.
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
13
14 import java.util.Collections;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.atomic.AtomicLong;
17 import java.util.concurrent.atomic.AtomicReference;
18
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
24 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
31 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import com.google.common.base.Objects;
44 import com.google.common.base.Objects.ToStringHelper;
45 import com.google.common.base.Optional;
46 import com.google.common.base.Preconditions;
47 import com.google.common.primitives.UnsignedLong;
48 import com.google.common.util.concurrent.Futures;
49 import com.google.common.util.concurrent.ListenableFuture;
50 import com.google.common.util.concurrent.ListeningExecutorService;
51
52 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
53
54     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
55     private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
56
57
58     private final ListeningExecutorService executor;
59     private final String name;
60     private final AtomicLong txCounter = new AtomicLong(0);
61     private final ListenerTree listenerTree;
62     private final AtomicReference<DataAndMetadataSnapshot> snapshot;
63
64     private ModificationApplyOperation operationTree;
65
66     private SchemaContext schemaContext;
67
68     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
69         this.name = Preconditions.checkNotNull(name);
70         this.executor = Preconditions.checkNotNull(executor);
71         this.listenerTree = ListenerTree.create();
72         this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
73         this.operationTree = new AlwaysFailOperation();
74     }
75
76     @Override
77     public final String getIdentifier() {
78         return name;
79     }
80
81     @Override
82     public DOMStoreReadTransaction newReadOnlyTransaction() {
83         return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
84     }
85
86     @Override
87     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
88         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
89     }
90
91     @Override
92     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
93         return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
94     }
95
96     @Override
97     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
98         operationTree = SchemaAwareApplyOperationRoot.from(ctx);
99         schemaContext = ctx;
100     }
101
102     @Override
103     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
104             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
105
106         /*
107          * Make sure commit is not occurring right now. Listener has to be registered and its
108          * state capture enqueued at a consistent point.
109          *
110          * FIXME: improve this to read-write lock, such that multiple listener registrations
111          *        can occur simultaneously
112          */
113         final DataChangeListenerRegistration<L> reg;
114         synchronized (this) {
115             LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
116
117             reg = listenerTree.registerDataChangeListener(path, listener, scope);
118
119             Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
120             if (currentState.isPresent()) {
121                 final NormalizedNode<?, ?> data = currentState.get().getData();
122
123                 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) //
124                         .setAfter(data) //
125                         .addCreated(path, data) //
126                         .build();
127                 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
128             }
129         }
130
131         return new AbstractListenerRegistration<L>(listener) {
132             @Override
133             protected void removeRegistration() {
134                 synchronized (InMemoryDOMDataStore.this) {
135                     reg.close();
136                 }
137             }
138         };
139     }
140
141     private synchronized DOMStoreThreePhaseCommitCohort submit(
142             final SnaphostBackedWriteTransaction writeTx) {
143         LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
144         return new ThreePhaseCommitImpl(writeTx);
145     }
146
147     private Object nextIdentifier() {
148         return name + "-" + txCounter.getAndIncrement();
149     }
150
151     private void commit(final DataAndMetadataSnapshot currentSnapshot,
152             final StoreMetadataNode newDataTree, final ResolveDataChangeEventsTask listenerResolver) {
153         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
154
155         if(LOG.isTraceEnabled()) {
156             LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
157         }
158
159         final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
160                 .setMetadataTree(newDataTree) //
161                 .setSchemaContext(schemaContext) //
162                 .build();
163
164         /*
165          * The commit has to occur atomically with regard to listener registrations.
166          */
167         synchronized (this) {
168             final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
169             checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
170
171             for (ChangeListenerNotifyTask task : listenerResolver.call()) {
172                 LOG.trace("Scheduling invocation of listeners: {}",task);
173                 executor.submit(task);
174             }
175         }
176     }
177
178     private static abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
179         private final Object identifier;
180
181         protected AbstractDOMStoreTransaction(final Object identifier) {
182             this.identifier = identifier;
183         }
184
185         @Override
186         public final Object getIdentifier() {
187             return identifier;
188         }
189
190         @Override
191         public final String toString() {
192             return addToStringAttributes(Objects.toStringHelper(this)).toString();
193         }
194
195         /**
196          * Add class-specific toString attributes.
197          *
198          * @param toStringHelper ToStringHelper instance
199          * @return ToStringHelper instance which was passed in
200          */
201         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
202             return toStringHelper.add("id", identifier);
203         }
204     }
205
206     private static class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements DOMStoreReadTransaction {
207         private DataAndMetadataSnapshot stableSnapshot;
208
209         public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
210             super(identifier);
211             this.stableSnapshot = Preconditions.checkNotNull(snapshot);
212             LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot.getMetadataTree().getSubtreeVersion());
213         }
214
215         @Override
216         public void close() {
217             LOG.debug("Store transaction: {} : Closed", getIdentifier());
218             stableSnapshot = null;
219         }
220
221         @Override
222         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
223             checkNotNull(path, "Path must not be null.");
224             checkState(stableSnapshot != null, "Transaction is closed");
225             return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
226         }
227     }
228
229     private static class SnaphostBackedWriteTransaction extends AbstractDOMStoreTransaction implements DOMStoreWriteTransaction {
230         private MutableDataTree mutableTree;
231         private InMemoryDOMDataStore store;
232         private boolean ready = false;
233
234         public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
235                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
236             super(identifier);
237             mutableTree = MutableDataTree.from(snapshot, applyOper);
238             this.store = store;
239             LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
240         }
241
242         @Override
243         public void close() {
244             LOG.debug("Store transaction: {} : Closed", getIdentifier());
245             this.mutableTree = null;
246             this.store = null;
247         }
248
249         @Override
250         public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
251             checkNotReady();
252             try {
253                 LOG.trace("Tx: {} Write: {}:{}",getIdentifier(),path,data);
254                 mutableTree.write(path, data);
255               // FIXME: Add checked exception
256             } catch (Exception e) {
257                 LOG.error("Tx: {}, failed to write {}:{} in {}",getIdentifier(),path,data,mutableTree,e);
258             }
259         }
260
261         @Override
262         public void merge(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
263             checkNotReady();
264             try {
265                 LOG.trace("Tx: {} Merge: {}:{}",getIdentifier(),path,data);
266                 mutableTree.merge(path, data);
267               // FIXME: Add checked exception
268             } catch (Exception e) {
269                 LOG.error("Tx: {}, failed to write {}:{} in {}",getIdentifier(),path,data,mutableTree,e);
270             }
271         }
272
273         @Override
274         public void delete(final InstanceIdentifier path) {
275             checkNotReady();
276             try {
277                 LOG.trace("Tx: {} Delete: {}",getIdentifier(),path);
278                 mutableTree.delete(path);
279              // FIXME: Add checked exception
280             } catch (Exception e) {
281                 LOG.error("Tx: {}, failed to delete {} in {}",getIdentifier(),path,mutableTree,e);
282             }
283         }
284
285         protected final boolean isReady() {
286             return ready;
287         }
288
289         protected final void checkNotReady() {
290             checkState(!ready, "Transaction %s is ready. No further modifications allowed.", getIdentifier());
291         }
292
293         @Override
294         public synchronized DOMStoreThreePhaseCommitCohort ready() {
295             checkState(!ready, "Transaction %s is already ready.", getIdentifier());
296             ready = true;
297
298             LOG.debug("Store transaction: {} : Ready", getIdentifier());
299             mutableTree.seal();
300             return store.submit(this);
301         }
302
303         protected MutableDataTree getMutatedView() {
304             return mutableTree;
305         }
306
307         @Override
308         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
309             return toStringHelper.add("ready", isReady());
310         }
311     }
312
313     private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
314             DOMStoreReadWriteTransaction {
315
316         protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
317                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
318             super(identifier, snapshot, store, applyOper);
319         }
320
321         @Override
322         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
323             LOG.trace("Tx: {} Read: {}",getIdentifier(),path);
324             try {
325                 return Futures.immediateFuture(getMutatedView().read(path));
326             } catch (Exception e) {
327                 LOG.error("Tx: {} Failed Read of {}",getIdentifier(),path,e);
328                 throw e;
329             }
330         }
331     }
332
333     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
334
335         private final SnaphostBackedWriteTransaction transaction;
336         private final NodeModification modification;
337
338         private DataAndMetadataSnapshot storeSnapshot;
339         private Optional<StoreMetadataNode> proposedSubtree;
340         private ResolveDataChangeEventsTask listenerResolver;
341
342         public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
343             this.transaction = writeTransaction;
344             this.modification = transaction.getMutatedView().getRootModification();
345         }
346
347         @Override
348         public ListenableFuture<Boolean> canCommit() {
349             final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
350             final ModificationApplyOperation snapshotOperation = operationTree;
351
352             return executor.submit(new Callable<Boolean>() {
353
354                 @Override
355                 public Boolean call() throws Exception {
356                     boolean applicable = snapshotOperation.isApplicable(modification,
357                             Optional.of(snapshotCapture.getMetadataTree()));
358                     LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
359                     return applicable;
360                 }
361             });
362         }
363
364         @Override
365         public ListenableFuture<Void> preCommit() {
366             storeSnapshot = snapshot.get();
367             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
368                 return Futures.immediateFuture(null);
369             }
370             return executor.submit(new Callable<Void>() {
371
372
373
374                 @Override
375                 public Void call() throws Exception {
376                     StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
377
378                     proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
379                             increase(metadataTree.getSubtreeVersion()));
380
381                     listenerResolver = ResolveDataChangeEventsTask.create() //
382                             .setRootPath(PUBLIC_ROOT_PATH) //
383                             .setBeforeRoot(Optional.of(metadataTree)) //
384                             .setAfterRoot(proposedSubtree) //
385                             .setModificationRoot(modification) //
386                             .setListenerRoot(listenerTree);
387
388                     return null;
389                 }
390             });
391         }
392
393         @Override
394         public ListenableFuture<Void> abort() {
395             storeSnapshot = null;
396             proposedSubtree = null;
397             return Futures.<Void> immediateFuture(null);
398         }
399
400         @Override
401         public ListenableFuture<Void> commit() {
402             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
403                 return Futures.immediateFuture(null);
404             }
405
406             checkState(proposedSubtree != null,"Proposed subtree must be computed");
407             checkState(storeSnapshot != null,"Proposed subtree must be computed");
408             // return ImmediateFuture<>;
409             InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerResolver);
410             return Futures.<Void> immediateFuture(null);
411         }
412
413     }
414
415     private static final class AlwaysFailOperation implements ModificationApplyOperation {
416
417         @Override
418         public Optional<StoreMetadataNode> apply(final NodeModification modification,
419                 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
420             throw new IllegalStateException("Schema Context is not available.");
421         }
422
423         @Override
424         public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
425             throw new IllegalStateException("Schema Context is not available.");
426         }
427
428         @Override
429         public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
430             throw new IllegalStateException("Schema Context is not available.");
431         }
432
433         @Override
434         public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
435             throw new IllegalStateException("Schema Context is not available.");
436         }
437
438     }
439 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.