2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.dom.store.impl;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.atomic.AtomicLong;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
19 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
20 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
29 import org.opendaylight.yangtools.concepts.Identifiable;
30 import org.opendaylight.yangtools.concepts.ListenerRegistration;
31 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
32 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
33 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
34 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
35 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
40 import com.google.common.base.Optional;
41 import com.google.common.primitives.UnsignedLong;
42 import com.google.common.util.concurrent.Futures;
43 import com.google.common.util.concurrent.ListenableFuture;
44 import com.google.common.util.concurrent.ListeningExecutorService;
46 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
48 private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
49 private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
52 private final ListeningExecutorService executor;
53 private final String name;
54 private final AtomicLong txCounter = new AtomicLong(0);
56 private DataAndMetadataSnapshot snapshot;
57 private ModificationApplyOperation operationTree;
58 private final ListenerRegistrationNode listenerTree;
62 private SchemaContext schemaContext;
64 public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
65 this.executor = executor;
67 this.operationTree = new AllwaysFailOperation();
68 this.snapshot = DataAndMetadataSnapshot.createEmpty();
69 this.listenerTree = ListenerRegistrationNode.createRoot();
73 public String getIdentifier() {
78 public DOMStoreReadTransaction newReadOnlyTransaction() {
79 return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
83 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
84 return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
88 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
89 return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
93 public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
94 operationTree = SchemaAwareApplyOperationRoot.from(ctx);
99 public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
100 final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
102 Optional<ListenerRegistrationNode> listenerNode = TreeNodeUtils.findNode(listenerTree, path);
103 checkState(listenerNode.isPresent());
104 synchronized (listener) {
105 notifyInitialState(path, listener);
107 return listenerNode.get().registerDataChangeListener(listener, scope);
110 private void notifyInitialState(final InstanceIdentifier path,
111 final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
112 Optional<StoreMetadataNode> currentState = snapshot.read(path);
114 if (currentState.isPresent()) {
115 NormalizedNode<?, ?> data = currentState.get().getData();
116 listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
118 .addCreated(path, data) //
122 } catch (Exception e) {
123 LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
128 private synchronized DOMStoreThreePhaseCommitCohort submit(
129 final SnaphostBackedWriteTransaction writeTx) {
130 LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
131 return new ThreePhaseCommitImpl(writeTx);
134 private Object nextIdentifier() {
135 return name + "-" + txCounter.getAndIncrement();
138 private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
139 final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
140 LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
141 if(LOG.isTraceEnabled()) {
142 LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
144 checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
145 snapshot = DataAndMetadataSnapshot.builder() //
146 .setMetadataTree(newDataTree) //
147 .setSchemaContext(schemaContext) //
150 for(ChangeListenerNotifyTask task : listenerTasks) {
151 executor.submit(task);
156 private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
158 private DataAndMetadataSnapshot stableSnapshot;
159 private final Object identifier;
161 public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
162 this.identifier = identifier;
163 this.stableSnapshot = snapshot;
164 LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
169 public Object getIdentifier() {
174 public void close() {
175 stableSnapshot = null;
179 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
180 checkNotNull(path, "Path must not be null.");
181 checkState(stableSnapshot != null, "Transaction is closed");
182 return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
186 public String toString() {
187 return "SnapshotBackedReadTransaction [id =" + identifier + "]";
192 private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction {
194 private MutableDataTree mutableTree;
195 private final Object identifier;
196 private InMemoryDOMDataStore store;
198 private boolean ready = false;
200 public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
201 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
202 this.identifier = identifier;
203 mutableTree = MutableDataTree.from(snapshot, applyOper);
205 LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
209 public Object getIdentifier() {
214 public void close() {
215 this.mutableTree = null;
220 public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
222 mutableTree.write(path, data);
226 public void delete(final InstanceIdentifier path) {
228 mutableTree.delete(path);
231 protected boolean isReady() {
235 protected void checkNotReady() {
236 checkState(!ready, "Transaction is ready. No further modifications allowed.");
240 public synchronized DOMStoreThreePhaseCommitCohort ready() {
242 LOG.debug("Store transaction: {} : Ready", getIdentifier());
244 return store.submit(this);
247 protected MutableDataTree getMutatedView() {
252 public String toString() {
253 return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
258 private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
259 DOMStoreReadWriteTransaction {
261 protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
262 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
263 super(identifier, snapshot, store, applyOper);
267 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
268 return Futures.immediateFuture(getMutatedView().read(path));
272 public String toString() {
273 return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
278 private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
280 private final SnaphostBackedWriteTransaction transaction;
281 private final NodeModification modification;
283 private DataAndMetadataSnapshot storeSnapshot;
284 private Optional<StoreMetadataNode> proposedSubtree;
285 private Iterable<ChangeListenerNotifyTask> listenerTasks;
287 public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
288 this.transaction = writeTransaction;
289 this.modification = transaction.getMutatedView().getRootModification();
293 public ListenableFuture<Boolean> canCommit() {
294 final DataAndMetadataSnapshot snapshotCapture = snapshot;
295 final ModificationApplyOperation snapshotOperation = operationTree;
297 return executor.submit(new Callable<Boolean>() {
300 public Boolean call() throws Exception {
301 boolean applicable = snapshotOperation.isApplicable(modification,
302 Optional.of(snapshotCapture.getMetadataTree()));
303 LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
310 public ListenableFuture<Void> preCommit() {
311 storeSnapshot = snapshot;
312 if(modification.getModificationType() == ModificationType.UNMODIFIED) {
313 return Futures.immediateFuture(null);
315 return executor.submit(new Callable<Void>() {
320 public Void call() throws Exception {
321 StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
323 proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
324 increase(metadataTree.getSubtreeVersion()));
327 listenerTasks = DataChangeEventResolver.create() //
328 .setRootPath(PUBLIC_ROOT_PATH) //
329 .setBeforeRoot(Optional.of(metadataTree)) //
330 .setAfterRoot(proposedSubtree) //
331 .setModificationRoot(modification) //
332 .setListenerRoot(listenerTree) //
341 public ListenableFuture<Void> abort() {
342 storeSnapshot = null;
343 proposedSubtree = null;
344 return Futures.<Void> immediateFuture(null);
348 public ListenableFuture<Void> commit() {
349 if(modification.getModificationType() == ModificationType.UNMODIFIED) {
350 return Futures.immediateFuture(null);
353 checkState(proposedSubtree != null,"Proposed subtree must be computed");
354 checkState(storeSnapshot != null,"Proposed subtree must be computed");
355 // return ImmediateFuture<>;
356 InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
357 return Futures.<Void> immediateFuture(null);
362 private class AllwaysFailOperation implements ModificationApplyOperation {
365 public Optional<StoreMetadataNode> apply(final NodeModification modification,
366 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
367 throw new IllegalStateException("Schema Context is not available.");
371 public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
372 throw new IllegalStateException("Schema Context is not available.");
376 public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
377 throw new IllegalStateException("Schema Context is not available.");
381 public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
382 throw new IllegalStateException("Schema Context is not available.");