Merge "BUG-467: reconnect-strategy configuration moved into controller/commons/protoc...
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / md / sal / dom / store / impl / InMemoryDOMDataStore.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.md.sal.dom.store.impl;
9
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
13
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.atomic.AtomicLong;
16
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
19 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
20 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
29 import org.opendaylight.yangtools.concepts.Identifiable;
30 import org.opendaylight.yangtools.concepts.ListenerRegistration;
31 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
35 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import com.google.common.base.Optional;
41 import com.google.common.primitives.UnsignedLong;
42 import com.google.common.util.concurrent.Futures;
43 import com.google.common.util.concurrent.ListenableFuture;
44 import com.google.common.util.concurrent.ListeningExecutorService;
45
46 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
47
48     private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
49     private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
50
51
52     private final ListeningExecutorService executor;
53     private final String name;
54     private final AtomicLong txCounter = new AtomicLong(0);
55
56     private DataAndMetadataSnapshot snapshot;
57     private ModificationApplyOperation operationTree;
58     private final ListenerRegistrationNode listenerTree;
59
60
61
62     private SchemaContext schemaContext;
63
64     public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
65         this.executor = executor;
66         this.name = name;
67         this.operationTree = new AllwaysFailOperation();
68         this.snapshot = DataAndMetadataSnapshot.createEmpty();
69         this.listenerTree = ListenerRegistrationNode.createRoot();
70     }
71
72     @Override
73     public String getIdentifier() {
74         return name;
75     }
76
77     @Override
78     public DOMStoreReadTransaction newReadOnlyTransaction() {
79         return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
80     }
81
82     @Override
83     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
84         return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
85     }
86
87     @Override
88     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
89         return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
90     }
91
92     @Override
93     public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
94         operationTree = SchemaAwareApplyOperationRoot.from(ctx);
95         schemaContext = ctx;
96     }
97
98     @Override
99     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
100             final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
101
102         Optional<ListenerRegistrationNode> listenerNode = TreeNodeUtils.findNode(listenerTree, path);
103         checkState(listenerNode.isPresent());
104         synchronized (listener) {
105             notifyInitialState(path, listener);
106         }
107         return listenerNode.get().registerDataChangeListener(listener, scope);
108     }
109
110     private void notifyInitialState(final InstanceIdentifier path,
111             final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
112         Optional<StoreMetadataNode> currentState = snapshot.read(path);
113         try {
114             if (currentState.isPresent()) {
115                 NormalizedNode<?, ?> data = currentState.get().getData();
116                 listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
117                         .setAfter(data) //
118                         .addCreated(path, data) //
119                         .build() //
120                 );
121             }
122         } catch (Exception e) {
123             LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
124         }
125
126     }
127
128     private synchronized DOMStoreThreePhaseCommitCohort submit(
129             final SnaphostBackedWriteTransaction writeTx) {
130         LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
131         return new ThreePhaseCommitImpl(writeTx);
132     }
133
134     private Object nextIdentifier() {
135         return name + "-" + txCounter.getAndIncrement();
136     }
137
138     private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
139             final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
140         LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
141         if(LOG.isTraceEnabled()) {
142             LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
143         }
144         checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
145         snapshot = DataAndMetadataSnapshot.builder() //
146                 .setMetadataTree(newDataTree) //
147                 .setSchemaContext(schemaContext) //
148                 .build();
149
150         for(ChangeListenerNotifyTask task : listenerTasks) {
151             executor.submit(task);
152         }
153
154     }
155
156     private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
157
158         private DataAndMetadataSnapshot stableSnapshot;
159         private final Object identifier;
160
161         public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
162             this.identifier = identifier;
163             this.stableSnapshot = snapshot;
164             LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
165
166         }
167
168         @Override
169         public Object getIdentifier() {
170             return identifier;
171         }
172
173         @Override
174         public void close() {
175             stableSnapshot = null;
176         }
177
178         @Override
179         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
180             checkNotNull(path, "Path must not be null.");
181             checkState(stableSnapshot != null, "Transaction is closed");
182             return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
183         }
184
185         @Override
186         public String toString() {
187             return "SnapshotBackedReadTransaction [id =" + identifier + "]";
188         }
189
190     }
191
192     private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction {
193
194         private MutableDataTree mutableTree;
195         private final Object identifier;
196         private InMemoryDOMDataStore store;
197
198         private boolean ready = false;
199
200         public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
201                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
202             this.identifier = identifier;
203             mutableTree = MutableDataTree.from(snapshot, applyOper);
204             this.store = store;
205             LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
206         }
207
208         @Override
209         public Object getIdentifier() {
210             return identifier;
211         }
212
213         @Override
214         public void close() {
215             this.mutableTree = null;
216             this.store = null;
217         }
218
219         @Override
220         public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
221             checkNotReady();
222             mutableTree.write(path, data);
223         }
224
225         @Override
226         public void delete(final InstanceIdentifier path) {
227             checkNotReady();
228             mutableTree.delete(path);
229         }
230
231         protected boolean isReady() {
232             return ready;
233         }
234
235         protected void checkNotReady() {
236             checkState(!ready, "Transaction is ready. No further modifications allowed.");
237         }
238
239         @Override
240         public synchronized DOMStoreThreePhaseCommitCohort ready() {
241             ready = true;
242             LOG.debug("Store transaction: {} : Ready", getIdentifier());
243             mutableTree.seal();
244             return store.submit(this);
245         }
246
247         protected MutableDataTree getMutatedView() {
248             return mutableTree;
249         }
250
251         @Override
252         public String toString() {
253             return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
254         }
255
256     }
257
258     private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
259             DOMStoreReadWriteTransaction {
260
261         protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
262                 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
263             super(identifier, snapshot, store, applyOper);
264         }
265
266         @Override
267         public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
268             return Futures.immediateFuture(getMutatedView().read(path));
269         }
270
271         @Override
272         public String toString() {
273             return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
274         }
275
276     }
277
278     private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
279
280         private final SnaphostBackedWriteTransaction transaction;
281         private final NodeModification modification;
282
283         private DataAndMetadataSnapshot storeSnapshot;
284         private Optional<StoreMetadataNode> proposedSubtree;
285         private Iterable<ChangeListenerNotifyTask> listenerTasks;
286
287         public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
288             this.transaction = writeTransaction;
289             this.modification = transaction.getMutatedView().getRootModification();
290         }
291
292         @Override
293         public ListenableFuture<Boolean> canCommit() {
294             final DataAndMetadataSnapshot snapshotCapture = snapshot;
295             final ModificationApplyOperation snapshotOperation = operationTree;
296
297             return executor.submit(new Callable<Boolean>() {
298
299                 @Override
300                 public Boolean call() throws Exception {
301                     boolean applicable = snapshotOperation.isApplicable(modification,
302                             Optional.of(snapshotCapture.getMetadataTree()));
303                     LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
304                     return applicable;
305                 }
306             });
307         }
308
309         @Override
310         public ListenableFuture<Void> preCommit() {
311             storeSnapshot = snapshot;
312             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
313                 return Futures.immediateFuture(null);
314             }
315             return executor.submit(new Callable<Void>() {
316
317
318
319                 @Override
320                 public Void call() throws Exception {
321                     StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
322
323                     proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
324                             increase(metadataTree.getSubtreeVersion()));
325
326
327                     listenerTasks = DataChangeEventResolver.create() //
328                             .setRootPath(PUBLIC_ROOT_PATH) //
329                             .setBeforeRoot(Optional.of(metadataTree)) //
330                             .setAfterRoot(proposedSubtree) //
331                             .setModificationRoot(modification) //
332                             .setListenerRoot(listenerTree) //
333                             .resolve();
334
335                     return null;
336                 }
337             });
338         }
339
340         @Override
341         public ListenableFuture<Void> abort() {
342             storeSnapshot = null;
343             proposedSubtree = null;
344             return Futures.<Void> immediateFuture(null);
345         }
346
347         @Override
348         public ListenableFuture<Void> commit() {
349             if(modification.getModificationType() == ModificationType.UNMODIFIED) {
350                 return Futures.immediateFuture(null);
351             }
352
353             checkState(proposedSubtree != null,"Proposed subtree must be computed");
354             checkState(storeSnapshot != null,"Proposed subtree must be computed");
355             // return ImmediateFuture<>;
356             InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
357             return Futures.<Void> immediateFuture(null);
358         }
359
360     }
361
362     private class AllwaysFailOperation implements ModificationApplyOperation {
363
364         @Override
365         public Optional<StoreMetadataNode> apply(final NodeModification modification,
366                 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
367             throw new IllegalStateException("Schema Context is not available.");
368         }
369
370         @Override
371         public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
372             throw new IllegalStateException("Schema Context is not available.");
373         }
374
375         @Override
376         public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
377             throw new IllegalStateException("Schema Context is not available.");
378         }
379
380         @Override
381         public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
382             throw new IllegalStateException("Schema Context is not available.");
383         }
384
385     }
386 }