7d647af53907242e9c06dec14dcfa81e05c2fad6
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12
13 import java.util.Collections;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.atomic.AtomicLong;
16
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
19 import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataPreconditionFailedException;
20 import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTree;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeCandidate;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeModification;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.DataTreeSnapshot;
24 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
25 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationApplyOperation;
26 import org.opendaylight.controller.md.sal.dom.store.impl.tree.data.InMemoryDataTreeFactory;
27 import org.opendaylight.controller.md.sal.dom.store.impl.tree.data.NodeModification;
28 import org.opendaylight.controller.md.sal.dom.store.impl.tree.data.StoreMetadataNode;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
35 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
36 import org.opendaylight.yangtools.concepts.Identifiable;
37 import org.opendaylight.yangtools.concepts.ListenerRegistration;
38 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
40 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
42 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import com.google.common.base.Objects;
47 import com.google.common.base.Objects.ToStringHelper;
48 import com.google.common.base.Optional;
49 import com.google.common.base.Preconditions;
50 import com.google.common.primitives.UnsignedLong;
51 import com.google.common.util.concurrent.Futures;
52 import com.google.common.util.concurrent.ListenableFuture;
53 import com.google.common.util.concurrent.ListeningExecutorService;
54
55 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
56
57     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
58     private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
59
60     private final ListeningExecutorService executor;
61     private final String name;
62     private final AtomicLong txCounter = new AtomicLong(0);
63     private final ListenerTree listenerTree = ListenerTree.create();
64     private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
65     private ModificationApplyOperation operationTree = new AlwaysFailOperation();
66
67     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
68         this.name = Preconditions.checkNotNull(name);
69         this.executor = Preconditions.checkNotNull(executor);
70     }
71
72     @Override
73     public final String getIdentifier() {
74         return name;
75     }
76
77     @Override
78     public DOMStoreReadTransaction newReadOnlyTransaction() {
79         return new SnapshotBackedReadTransaction(nextIdentifier(), dataTree.takeSnapshot());
80     }
81
82     @Override
83     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
84         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this, operationTree);
85     }
86
87     @Override
88     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
89         return new SnapshotBackedWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this, operationTree);
90     }
91
92     @Override
93     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
94         /*
95          * Order of operations is important: dataTree may reject the context
96          * and creation of ModificationApplyOperation may fail. So pre-construct
97          * the operation, then update the data tree and then move the operation
98          * into view.
99          */
100         final ModificationApplyOperation newOperationTree = SchemaAwareApplyOperationRoot.from(ctx);
101         dataTree.setSchemaContext(ctx);
102         operationTree = newOperationTree;
103     }
104
105     @Override
106     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
107             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
108
109         /*
110          * Make sure commit is not occurring right now. Listener has to be
111          * registered and its state capture enqueued at a consistent point.
112          *
113          * FIXME: improve this to read-write lock, such that multiple listener
114          * registrations can occur simultaneously
115          */
116         final DataChangeListenerRegistration<L> reg;
117         synchronized (this) {
118             LOG.debug("{}: Registering data change listener {} for {}", name, listener, path);
119
120             reg = listenerTree.registerDataChangeListener(path, listener, scope);
121
122             Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
123             if (currentState.isPresent()) {
124                 final NormalizedNode<?, ?> data = currentState.get();
125
126                 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) //
127                         .setAfter(data) //
128                         .addCreated(path, data) //
129                         .build();
130                 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
131             }
132         }
133
134         return new AbstractListenerRegistration<L>(listener) {
135             @Override
136             protected void removeRegistration() {
137                 synchronized (InMemoryDOMDataStore.this) {
138                     reg.close();
139                 }
140             }
141         };
142     }
143
144     private synchronized DOMStoreThreePhaseCommitCohort submit(final SnapshotBackedWriteTransaction writeTx) {
145         LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView());
146         return new ThreePhaseCommitImpl(writeTx);
147     }
148
149     private Object nextIdentifier() {
150         return name + "-" + txCounter.getAndIncrement();
151     }
152
153     private static abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
154         private final Object identifier;
155
156         protected AbstractDOMStoreTransaction(final Object identifier) {
157             this.identifier = identifier;
158         }
159
160         @Override
161         public final Object getIdentifier() {
162             return identifier;
163         }
164
165         @Override
166         public final String toString() {
167             return addToStringAttributes(Objects.toStringHelper(this)).toString();
168         }
169
170         /**
171          * Add class-specific toString attributes.
172          *
173          * @param toStringHelper
174          *            ToStringHelper instance
175          * @return ToStringHelper instance which was passed in
176          */
177         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
178             return toStringHelper.add("id", identifier);
179         }
180     }
181
182     private static final class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements
183             DOMStoreReadTransaction {
184         private DataTreeSnapshot stableSnapshot;
185
186         public SnapshotBackedReadTransaction(final Object identifier, final DataTreeSnapshot snapshot) {
187             super(identifier);
188             this.stableSnapshot = Preconditions.checkNotNull(snapshot);
189             LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot);
190         }
191
192         @Override
193         public void close() {
194             LOG.debug("Store transaction: {} : Closed", getIdentifier());
195             stableSnapshot = null;
196         }
197
198         @Override
199         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
200             checkNotNull(path, "Path must not be null.");
201             checkState(stableSnapshot != null, "Transaction is closed");
202             return Futures.immediateFuture(stableSnapshot.readNode(path));
203         }
204     }
205
206     private static class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction implements
207             DOMStoreWriteTransaction {
208         private DataTreeModification mutableTree;
209         private InMemoryDOMDataStore store;
210         private boolean ready = false;
211
212         public SnapshotBackedWriteTransaction(final Object identifier, final DataTreeSnapshot snapshot,
213                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
214             super(identifier);
215             mutableTree = snapshot.newModification(applyOper);
216             this.store = store;
217             LOG.debug("Write Tx: {} allocated with snapshot {}", identifier, snapshot);
218         }
219
220         @Override
221         public void close() {
222             LOG.debug("Store transaction: {} : Closed", getIdentifier());
223             this.mutableTree = null;
224             this.store = null;
225         }
226
227         @Override
228         public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
229             checkNotReady();
230             try {
231                 LOG.trace("Tx: {} Write: {}:{}", getIdentifier(), path, data);
232                 mutableTree.write(path, data);
233                 // FIXME: Add checked exception
234             } catch (Exception e) {
235                 LOG.error("Tx: {}, failed to write {}:{} in {}", getIdentifier(), path, data, mutableTree, e);
236             }
237         }
238
239         @Override
240         public void merge(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
241             checkNotReady();
242             try {
243                 LOG.trace("Tx: {} Merge: {}:{}", getIdentifier(), path, data);
244                 mutableTree.merge(path, data);
245                 // FIXME: Add checked exception
246             } catch (Exception e) {
247                 LOG.error("Tx: {}, failed to write {}:{} in {}", getIdentifier(), path, data, mutableTree, e);
248             }
249         }
250
251         @Override
252         public void delete(final InstanceIdentifier path) {
253             checkNotReady();
254             try {
255                 LOG.trace("Tx: {} Delete: {}", getIdentifier(), path);
256                 mutableTree.delete(path);
257                 // FIXME: Add checked exception
258             } catch (Exception e) {
259                 LOG.error("Tx: {}, failed to delete {} in {}", getIdentifier(), path, mutableTree, e);
260             }
261         }
262
263         protected final boolean isReady() {
264             return ready;
265         }
266
267         protected final void checkNotReady() {
268             checkState(!ready, "Transaction %s is ready. No further modifications allowed.", getIdentifier());
269         }
270
271         @Override
272         public synchronized DOMStoreThreePhaseCommitCohort ready() {
273             checkState(!ready, "Transaction %s is already ready.", getIdentifier());
274             ready = true;
275
276             LOG.debug("Store transaction: {} : Ready", getIdentifier());
277             mutableTree.seal();
278             return store.submit(this);
279         }
280
281         protected DataTreeModification getMutatedView() {
282             return mutableTree;
283         }
284
285         @Override
286         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
287             return toStringHelper.add("ready", isReady());
288         }
289     }
290
291     private static class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements
292             DOMStoreReadWriteTransaction {
293
294         protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataTreeSnapshot snapshot,
295                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
296             super(identifier, snapshot, store, applyOper);
297         }
298
299         @Override
300         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
301             LOG.trace("Tx: {} Read: {}", getIdentifier(), path);
302             try {
303                 return Futures.immediateFuture(getMutatedView().readNode(path));
304             } catch (Exception e) {
305                 LOG.error("Tx: {} Failed Read of {}", getIdentifier(), path, e);
306                 throw e;
307             }
308         }
309     }
310
311     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
312
313         private final SnapshotBackedWriteTransaction transaction;
314         private final DataTreeModification modification;
315
316         private ResolveDataChangeEventsTask listenerResolver;
317         private DataTreeCandidate candidate;
318
319         public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) {
320             this.transaction = writeTransaction;
321             this.modification = transaction.getMutatedView();
322         }
323
324         @Override
325         public ListenableFuture<Boolean> canCommit() {
326             return executor.submit(new Callable<Boolean>() {
327                 @Override
328                 public Boolean call() {
329                     try {
330                         dataTree.validate(modification);
331                         LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
332                         return true;
333                     } catch (DataPreconditionFailedException e) {
334                         LOG.warn("Store Tx: {} Data Precondition failed for {}.",transaction.getIdentifier(),e.getPath(),e);
335                         return false;
336                     }
337                 }
338             });
339         }
340
341         @Override
342         public ListenableFuture<Void> preCommit() {
343             return executor.submit(new Callable<Void>() {
344                 @Override
345                 public Void call() {
346                         candidate = dataTree.prepare(modification);
347
348                     listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
349
350 //                            .setRootPath(PUBLIC_ROOT_PATH) //
351 //                            .setBeforeRoot(Optional.of(metadataTree)) //
352 //                            .setAfterRoot(proposedSubtree) //
353 //                            .setModificationRoot(modification.getRootModification()) //
354 //                            .setListenerRoot(listenerTree);
355
356                     return null;
357                 }
358             });
359         }
360
361         @Override
362         public ListenableFuture<Void> abort() {
363                 if (candidate != null) {
364                         candidate.close();
365                         candidate = null;
366                 }
367
368             return Futures.<Void> immediateFuture(null);
369         }
370
371         @Override
372         public ListenableFuture<Void> commit() {
373             checkState(candidate != null, "Proposed subtree must be computed");
374
375             /*
376              * The commit has to occur atomically with regard to listener
377              * registrations.
378              */
379             synchronized (this) {
380                 dataTree.commit(candidate);
381
382                 for (ChangeListenerNotifyTask task : listenerResolver.call()) {
383                     LOG.trace("Scheduling invocation of listeners: {}", task);
384                     executor.submit(task);
385                 }
386             }
387
388             return Futures.<Void> immediateFuture(null);
389         }
390     }
391
392     private static final class AlwaysFailOperation implements ModificationApplyOperation {
393
394         @Override
395         public Optional<StoreMetadataNode> apply(final NodeModification modification,
396                 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
397             throw new IllegalStateException("Schema Context is not available.");
398         }
399
400         @Override
401         public void checkApplicable(final InstanceIdentifier path,final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
402             throw new IllegalStateException("Schema Context is not available.");
403         }
404
405         @Override
406         public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
407             throw new IllegalStateException("Schema Context is not available.");
408         }
409
410         @Override
411         public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
412             throw new IllegalStateException("Schema Context is not available.");
413         }
414
415     }
416 }