Bug 508: Improved error reporting for failed canCommit phase.
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
13
14 import java.util.Collections;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.atomic.AtomicLong;
17 import java.util.concurrent.atomic.AtomicReference;
18
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
24 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
31 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import com.google.common.base.Objects;
44 import com.google.common.base.Objects.ToStringHelper;
45 import com.google.common.base.Optional;
46 import com.google.common.base.Preconditions;
47 import com.google.common.primitives.UnsignedLong;
48 import com.google.common.util.concurrent.Futures;
49 import com.google.common.util.concurrent.ListenableFuture;
50 import com.google.common.util.concurrent.ListeningExecutorService;
51
52 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
53
54     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
55     private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
56
57     private final ListeningExecutorService executor;
58     private final String name;
59     private final AtomicLong txCounter = new AtomicLong(0);
60     private final ListenerTree listenerTree;
61     private final AtomicReference<DataAndMetadataSnapshot> snapshot;
62
63     private ModificationApplyOperation operationTree;
64
65     private SchemaContext schemaContext;
66
67     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
68         this.name = Preconditions.checkNotNull(name);
69         this.executor = Preconditions.checkNotNull(executor);
70         this.listenerTree = ListenerTree.create();
71         this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
72         this.operationTree = new AlwaysFailOperation();
73     }
74
75     @Override
76     public final String getIdentifier() {
77         return name;
78     }
79
80     @Override
81     public DOMStoreReadTransaction newReadOnlyTransaction() {
82         return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
83     }
84
85     @Override
86     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
87         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
88     }
89
90     @Override
91     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
92         return new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
93     }
94
95     @Override
96     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
97         operationTree = SchemaAwareApplyOperationRoot.from(ctx);
98         schemaContext = ctx;
99     }
100
101     @Override
102     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
103             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
104
105         /*
106          * Make sure commit is not occurring right now. Listener has to be
107          * registered and its state capture enqueued at a consistent point.
108          *
109          * FIXME: improve this to read-write lock, such that multiple listener
110          * registrations can occur simultaneously
111          */
112         final DataChangeListenerRegistration<L> reg;
113         synchronized (this) {
114             LOG.debug("{}: Registering data change listener {} for {}", name, listener, path);
115
116             reg = listenerTree.registerDataChangeListener(path, listener, scope);
117
118             Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
119             if (currentState.isPresent()) {
120                 final NormalizedNode<?, ?> data = currentState.get().getData();
121
122                 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) //
123                         .setAfter(data) //
124                         .addCreated(path, data) //
125                         .build();
126                 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
127             }
128         }
129
130         return new AbstractListenerRegistration<L>(listener) {
131             @Override
132             protected void removeRegistration() {
133                 synchronized (InMemoryDOMDataStore.this) {
134                     reg.close();
135                 }
136             }
137         };
138     }
139
140     private synchronized DOMStoreThreePhaseCommitCohort submit(final SnapshotBackedWriteTransaction writeTx) {
141         LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView());
142         return new ThreePhaseCommitImpl(writeTx);
143     }
144
145     private Object nextIdentifier() {
146         return name + "-" + txCounter.getAndIncrement();
147     }
148
149     private void commit(final DataAndMetadataSnapshot currentSnapshot, final StoreMetadataNode newDataTree,
150             final ResolveDataChangeEventsTask listenerResolver) {
151         LOG.debug("Updating Store snaphot version: {} with version:{}", currentSnapshot.getMetadataTree()
152                 .getSubtreeVersion(), newDataTree.getSubtreeVersion());
153
154         if (LOG.isTraceEnabled()) {
155             LOG.trace("Data Tree is {}", StoreUtils.toStringTree(newDataTree));
156         }
157
158         final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
159                 .setMetadataTree(newDataTree) //
160                 .setSchemaContext(schemaContext) //
161                 .build();
162
163         /*
164          * The commit has to occur atomically with regard to listener
165          * registrations.
166          */
167         synchronized (this) {
168             final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
169             checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
170
171             for (ChangeListenerNotifyTask task : listenerResolver.call()) {
172                 LOG.trace("Scheduling invocation of listeners: {}", task);
173                 executor.submit(task);
174             }
175         }
176     }
177
178     private static abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
179         private final Object identifier;
180
181         protected AbstractDOMStoreTransaction(final Object identifier) {
182             this.identifier = identifier;
183         }
184
185         @Override
186         public final Object getIdentifier() {
187             return identifier;
188         }
189
190         @Override
191         public final String toString() {
192             return addToStringAttributes(Objects.toStringHelper(this)).toString();
193         }
194
195         /**
196          * Add class-specific toString attributes.
197          *
198          * @param toStringHelper
199          *            ToStringHelper instance
200          * @return ToStringHelper instance which was passed in
201          */
202         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
203             return toStringHelper.add("id", identifier);
204         }
205     }
206
207     private static class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements
208             DOMStoreReadTransaction {
209         private DataAndMetadataSnapshot stableSnapshot;
210
211         public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
212             super(identifier);
213             this.stableSnapshot = Preconditions.checkNotNull(snapshot);
214             LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot.getMetadataTree()
215                     .getSubtreeVersion());
216         }
217
218         @Override
219         public void close() {
220             LOG.debug("Store transaction: {} : Closed", getIdentifier());
221             stableSnapshot = null;
222         }
223
224         @Override
225         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
226             checkNotNull(path, "Path must not be null.");
227             checkState(stableSnapshot != null, "Transaction is closed");
228             return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
229         }
230     }
231
232     private static class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction implements
233             DOMStoreWriteTransaction {
234         private MutableDataTree mutableTree;
235         private InMemoryDOMDataStore store;
236         private boolean ready = false;
237
238         public SnapshotBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
239                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
240             super(identifier);
241             mutableTree = MutableDataTree.from(snapshot, applyOper);
242             this.store = store;
243             LOG.debug("Write Tx: {} allocated with snapshot {}", identifier, snapshot.getMetadataTree()
244                     .getSubtreeVersion());
245         }
246
247         @Override
248         public void close() {
249             LOG.debug("Store transaction: {} : Closed", getIdentifier());
250             this.mutableTree = null;
251             this.store = null;
252         }
253
254         @Override
255         public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
256             checkNotReady();
257             try {
258                 LOG.trace("Tx: {} Write: {}:{}", getIdentifier(), path, data);
259                 mutableTree.write(path, data);
260                 // FIXME: Add checked exception
261             } catch (Exception e) {
262                 LOG.error("Tx: {}, failed to write {}:{} in {}", getIdentifier(), path, data, mutableTree, e);
263             }
264         }
265
266         @Override
267         public void merge(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
268             checkNotReady();
269             try {
270                 LOG.trace("Tx: {} Merge: {}:{}", getIdentifier(), path, data);
271                 mutableTree.merge(path, data);
272                 // FIXME: Add checked exception
273             } catch (Exception e) {
274                 LOG.error("Tx: {}, failed to write {}:{} in {}", getIdentifier(), path, data, mutableTree, e);
275             }
276         }
277
278         @Override
279         public void delete(final InstanceIdentifier path) {
280             checkNotReady();
281             try {
282                 LOG.trace("Tx: {} Delete: {}", getIdentifier(), path);
283                 mutableTree.delete(path);
284                 // FIXME: Add checked exception
285             } catch (Exception e) {
286                 LOG.error("Tx: {}, failed to delete {} in {}", getIdentifier(), path, mutableTree, e);
287             }
288         }
289
290         protected final boolean isReady() {
291             return ready;
292         }
293
294         protected final void checkNotReady() {
295             checkState(!ready, "Transaction %s is ready. No further modifications allowed.", getIdentifier());
296         }
297
298         @Override
299         public synchronized DOMStoreThreePhaseCommitCohort ready() {
300             checkState(!ready, "Transaction %s is already ready.", getIdentifier());
301             ready = true;
302
303             LOG.debug("Store transaction: {} : Ready", getIdentifier());
304             mutableTree.seal();
305             return store.submit(this);
306         }
307
308         protected MutableDataTree getMutatedView() {
309             return mutableTree;
310         }
311
312         @Override
313         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
314             return toStringHelper.add("ready", isReady());
315         }
316     }
317
318     private static class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements
319             DOMStoreReadWriteTransaction {
320
321         protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
322                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
323             super(identifier, snapshot, store, applyOper);
324         }
325
326         @Override
327         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
328             LOG.trace("Tx: {} Read: {}", getIdentifier(), path);
329             try {
330                 return Futures.immediateFuture(getMutatedView().read(path));
331             } catch (Exception e) {
332                 LOG.error("Tx: {} Failed Read of {}", getIdentifier(), path, e);
333                 throw e;
334             }
335         }
336     }
337
338     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
339
340         private final SnapshotBackedWriteTransaction transaction;
341         private final NodeModification modification;
342
343         private DataAndMetadataSnapshot storeSnapshot;
344         private Optional<StoreMetadataNode> proposedSubtree;
345         private ResolveDataChangeEventsTask listenerResolver;
346
347         public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) {
348             this.transaction = writeTransaction;
349             this.modification = transaction.getMutatedView().getRootModification();
350         }
351
352         @Override
353         public ListenableFuture<Boolean> canCommit() {
354             final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
355             final ModificationApplyOperation snapshotOperation = operationTree;
356
357             return executor.submit(new Callable<Boolean>() {
358
359                 @Override
360                 public Boolean call() throws Exception {
361                     Boolean applicable = false;
362                     try {
363                         snapshotOperation.checkApplicable(PUBLIC_ROOT_PATH, modification,
364                             Optional.of(snapshotCapture.getMetadataTree()));
365                         applicable = true;
366                     } catch (DataPreconditionFailedException e) {
367                         LOG.warn("Store Tx: {} Data Precondition failed for {}.",transaction.getIdentifier(),e.getPath(),e);
368                         applicable = false;
369                     }
370                     LOG.debug("Store Transaction: {} : canCommit : {}", transaction.getIdentifier(), applicable);
371                     return applicable;
372                 }
373             });
374         }
375
376         @Override
377         public ListenableFuture<Void> preCommit() {
378             storeSnapshot = snapshot.get();
379             if (modification.getModificationType() == ModificationType.UNMODIFIED) {
380                 return Futures.immediateFuture(null);
381             }
382             return executor.submit(new Callable<Void>() {
383
384                 @Override
385                 public Void call() throws Exception {
386                     StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
387
388                     proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
389                             increase(metadataTree.getSubtreeVersion()));
390
391                     listenerResolver = ResolveDataChangeEventsTask.create() //
392                             .setRootPath(PUBLIC_ROOT_PATH) //
393                             .setBeforeRoot(Optional.of(metadataTree)) //
394                             .setAfterRoot(proposedSubtree) //
395                             .setModificationRoot(modification) //
396                             .setListenerRoot(listenerTree);
397
398                     return null;
399                 }
400             });
401         }
402
403         @Override
404         public ListenableFuture<Void> abort() {
405             storeSnapshot = null;
406             proposedSubtree = null;
407             return Futures.<Void> immediateFuture(null);
408         }
409
410         @Override
411         public ListenableFuture<Void> commit() {
412             if (modification.getModificationType() == ModificationType.UNMODIFIED) {
413                 return Futures.immediateFuture(null);
414             }
415
416             checkState(proposedSubtree != null, "Proposed subtree must be computed");
417             checkState(storeSnapshot != null, "Proposed subtree must be computed");
418             // return ImmediateFuture<>;
419             InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(), listenerResolver);
420             return Futures.<Void> immediateFuture(null);
421         }
422
423     }
424
425     private static final class AlwaysFailOperation implements ModificationApplyOperation {
426
427         @Override
428         public Optional<StoreMetadataNode> apply(final NodeModification modification,
429                 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
430             throw new IllegalStateException("Schema Context is not available.");
431         }
432
433         @Override
434         public void checkApplicable(final InstanceIdentifier path,final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
435             throw new IllegalStateException("Schema Context is not available.");
436         }
437
438         @Override
439         public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
440             throw new IllegalStateException("Schema Context is not available.");
441         }
442
443         @Override
444         public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
445             throw new IllegalStateException("Schema Context is not available.");
446         }
447
448     }
449 }