BUG-509: Fix thread safety of listener registration
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
13
14 import java.util.Collections;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.atomic.AtomicLong;
17 import java.util.concurrent.atomic.AtomicReference;
18
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
24 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
25 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
33 import org.opendaylight.yangtools.concepts.Identifiable;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
37 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
38 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import com.google.common.base.Objects;
45 import com.google.common.base.Objects.ToStringHelper;
46 import com.google.common.base.Optional;
47 import com.google.common.base.Preconditions;
48 import com.google.common.primitives.UnsignedLong;
49 import com.google.common.util.concurrent.Futures;
50 import com.google.common.util.concurrent.ListenableFuture;
51 import com.google.common.util.concurrent.ListeningExecutorService;
52
53 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
54
55     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
56     private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
57
58
59     private final ListeningExecutorService executor;
60     private final String name;
61     private final AtomicLong txCounter = new AtomicLong(0);
62     private final ListenerRegistrationNode listenerTree;
63     private final AtomicReference<DataAndMetadataSnapshot> snapshot;
64
65     private ModificationApplyOperation operationTree;
66
67     private SchemaContext schemaContext;
68
69     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
70         this.name = Preconditions.checkNotNull(name);
71         this.executor = Preconditions.checkNotNull(executor);
72         this.listenerTree = ListenerRegistrationNode.createRoot();
73         this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
74         this.operationTree = new AlwaysFailOperation();
75     }
76
77     @Override
78     public final String getIdentifier() {
79         return name;
80     }
81
82     @Override
83     public DOMStoreReadTransaction newReadOnlyTransaction() {
84         return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
85     }
86
87     @Override
88     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
89         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
90     }
91
92     @Override
93     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
94         return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
95     }
96
97     @Override
98     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
99         operationTree = SchemaAwareApplyOperationRoot.from(ctx);
100         schemaContext = ctx;
101     }
102
103     @Override
104     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
105             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
106
107         /*
108          * Make sure commit is not occurring right now. Listener has to be registered and its
109          * state capture enqueued at a consistent point.
110          *
111          * FIXME: improve this to read-write lock, such that multiple listener registrations
112          *        can occur simultaneously
113          */
114         final DataChangeListenerRegistration<L> reg;
115         synchronized (this) {
116             LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
117             ListenerRegistrationNode listenerNode = listenerTree;
118             for(PathArgument arg : path.getPath()) {
119                 listenerNode = listenerNode.ensureChild(arg);
120             }
121
122             reg = listenerNode.registerDataChangeListener(path, listener, scope);
123
124             Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
125             if (currentState.isPresent()) {
126                 final NormalizedNode<?, ?> data = currentState.get().getData();
127
128                 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
129                         .setAfter(data) //
130                         .addCreated(path, data) //
131                         .build();
132                 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
133             }
134         }
135
136         return new AbstractListenerRegistration<L>(listener) {
137             @Override
138             protected void removeRegistration() {
139                 synchronized (InMemoryDOMDataStore.this) {
140                     reg.close();
141                 }
142             }
143         };
144     }
145
146     private synchronized DOMStoreThreePhaseCommitCohort submit(
147             final SnaphostBackedWriteTransaction writeTx) {
148         LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
149         return new ThreePhaseCommitImpl(writeTx);
150     }
151
152     private Object nextIdentifier() {
153         return name + "-" + txCounter.getAndIncrement();
154     }
155
156     private void commit(final DataAndMetadataSnapshot currentSnapshot,
157             final StoreMetadataNode newDataTree, final DataChangeEventResolver listenerResolver) {
158         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
159
160         if(LOG.isTraceEnabled()) {
161             LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
162         }
163
164         final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
165                 .setMetadataTree(newDataTree) //
166                 .setSchemaContext(schemaContext) //
167                 .build();
168
169         /*
170          * The commit has to occur atomically with regard to listener registrations.
171          */
172         synchronized (this) {
173             final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
174             checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
175
176             for (ChangeListenerNotifyTask task : listenerResolver.resolve()) {
177                 executor.submit(task);
178             }
179         }
180     }
181
182     private static abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
183         private final Object identifier;
184
185         protected AbstractDOMStoreTransaction(final Object identifier) {
186             this.identifier = identifier;
187         }
188
189         @Override
190         public final Object getIdentifier() {
191             return identifier;
192         }
193
194         @Override
195         public final String toString() {
196             return addToStringAttributes(Objects.toStringHelper(this)).toString();
197         }
198
199         /**
200          * Add class-specific toString attributes.
201          *
202          * @param toStringHelper ToStringHelper instance
203          * @return ToStringHelper instance which was passed in
204          */
205         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
206             return toStringHelper.add("id", identifier);
207         }
208     }
209
210     private static class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements DOMStoreReadTransaction {
211         private DataAndMetadataSnapshot stableSnapshot;
212
213         public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
214             super(identifier);
215             this.stableSnapshot = Preconditions.checkNotNull(snapshot);
216             LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot.getMetadataTree().getSubtreeVersion());
217         }
218
219         @Override
220         public void close() {
221             LOG.debug("Store transaction: {} : Closed", getIdentifier());
222             stableSnapshot = null;
223         }
224
225         @Override
226         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
227             checkNotNull(path, "Path must not be null.");
228             checkState(stableSnapshot != null, "Transaction is closed");
229             return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
230         }
231     }
232
233     private static class SnaphostBackedWriteTransaction extends AbstractDOMStoreTransaction implements DOMStoreWriteTransaction {
234         private MutableDataTree mutableTree;
235         private InMemoryDOMDataStore store;
236         private boolean ready = false;
237
238         public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
239                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
240             super(identifier);
241             mutableTree = MutableDataTree.from(snapshot, applyOper);
242             this.store = store;
243             LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
244         }
245
246         @Override
247         public void close() {
248             LOG.debug("Store transaction: {} : Closed", getIdentifier());
249             this.mutableTree = null;
250             this.store = null;
251         }
252
253         @Override
254         public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
255             checkNotReady();
256             mutableTree.write(path, data);
257         }
258
259         @Override
260         public void delete(final InstanceIdentifier path) {
261             checkNotReady();
262             mutableTree.delete(path);
263         }
264
265         protected final boolean isReady() {
266             return ready;
267         }
268
269         protected final void checkNotReady() {
270             checkState(!ready, "Transaction %s is ready. No further modifications allowed.", getIdentifier());
271         }
272
273         @Override
274         public synchronized DOMStoreThreePhaseCommitCohort ready() {
275             checkState(!ready, "Transaction %s is already ready.", getIdentifier());
276             ready = true;
277
278             LOG.debug("Store transaction: {} : Ready", getIdentifier());
279             mutableTree.seal();
280             return store.submit(this);
281         }
282
283         protected MutableDataTree getMutatedView() {
284             return mutableTree;
285         }
286
287         @Override
288         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
289             return toStringHelper.add("ready", isReady());
290         }
291     }
292
293     private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
294             DOMStoreReadWriteTransaction {
295
296         protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
297                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
298             super(identifier, snapshot, store, applyOper);
299         }
300
301         @Override
302         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
303             return Futures.immediateFuture(getMutatedView().read(path));
304         }
305     }
306
307     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
308
309         private final SnaphostBackedWriteTransaction transaction;
310         private final NodeModification modification;
311
312         private DataAndMetadataSnapshot storeSnapshot;
313         private Optional<StoreMetadataNode> proposedSubtree;
314         private DataChangeEventResolver listenerResolver;
315
316         public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
317             this.transaction = writeTransaction;
318             this.modification = transaction.getMutatedView().getRootModification();
319         }
320
321         @Override
322         public ListenableFuture<Boolean> canCommit() {
323             final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
324             final ModificationApplyOperation snapshotOperation = operationTree;
325
326             return executor.submit(new Callable<Boolean>() {
327
328                 @Override
329                 public Boolean call() throws Exception {
330                     boolean applicable = snapshotOperation.isApplicable(modification,
331                             Optional.of(snapshotCapture.getMetadataTree()));
332                     LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
333                     return applicable;
334                 }
335             });
336         }
337
338         @Override
339         public ListenableFuture<Void> preCommit() {
340             storeSnapshot = snapshot.get();
341             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
342                 return Futures.immediateFuture(null);
343             }
344             return executor.submit(new Callable<Void>() {
345
346
347
348                 @Override
349                 public Void call() throws Exception {
350                     StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
351
352                     proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
353                             increase(metadataTree.getSubtreeVersion()));
354
355                     listenerResolver = DataChangeEventResolver.create() //
356                             .setRootPath(PUBLIC_ROOT_PATH) //
357                             .setBeforeRoot(Optional.of(metadataTree)) //
358                             .setAfterRoot(proposedSubtree) //
359                             .setModificationRoot(modification) //
360                             .setListenerRoot(listenerTree);
361
362                     return null;
363                 }
364             });
365         }
366
367         @Override
368         public ListenableFuture<Void> abort() {
369             storeSnapshot = null;
370             proposedSubtree = null;
371             return Futures.<Void> immediateFuture(null);
372         }
373
374         @Override
375         public ListenableFuture<Void> commit() {
376             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
377                 return Futures.immediateFuture(null);
378             }
379
380             checkState(proposedSubtree != null,"Proposed subtree must be computed");
381             checkState(storeSnapshot != null,"Proposed subtree must be computed");
382             // return ImmediateFuture<>;
383             InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerResolver);
384             return Futures.<Void> immediateFuture(null);
385         }
386
387     }
388
389     private static final class AlwaysFailOperation implements ModificationApplyOperation {
390
391         @Override
392         public Optional<StoreMetadataNode> apply(final NodeModification modification,
393                 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
394             throw new IllegalStateException("Schema Context is not available.");
395         }
396
397         @Override
398         public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
399             throw new IllegalStateException("Schema Context is not available.");
400         }
401
402         @Override
403         public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
404             throw new IllegalStateException("Schema Context is not available.");
405         }
406
407         @Override
408         public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
409             throw new IllegalStateException("Schema Context is not available.");
410         }
411
412     }
413 }