2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.dom.store.impl;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.atomic.AtomicLong;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
19 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
20 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.TreeNodeUtils;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
28 import org.opendaylight.yangtools.concepts.Identifiable;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import com.google.common.base.Optional;
40 import com.google.common.primitives.UnsignedLong;
41 import com.google.common.util.concurrent.Futures;
42 import com.google.common.util.concurrent.ListenableFuture;
43 import com.google.common.util.concurrent.ListeningExecutorService;
45 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
47 private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
48 private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
51 private final ListeningExecutorService executor;
52 private final String name;
53 private final AtomicLong txCounter = new AtomicLong(0);
55 private DataAndMetadataSnapshot snapshot;
56 private ModificationApplyOperation operationTree;
57 private final ListenerRegistrationNode listenerTree;
61 private SchemaContext schemaContext;
63 public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
64 this.executor = executor;
66 this.operationTree = new AllwaysFailOperation();
67 this.snapshot = DataAndMetadataSnapshot.createEmpty();
68 this.listenerTree = ListenerRegistrationNode.createRoot();
72 public String getIdentifier() {
77 public DOMStoreReadTransaction newReadOnlyTransaction() {
78 return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
82 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
83 return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
87 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
88 return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
92 public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
93 operationTree = SchemaAwareApplyOperationRoot.from(ctx);
98 public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
99 final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
101 Optional<ListenerRegistrationNode> listenerNode = TreeNodeUtils.findNode(listenerTree, path);
102 checkState(listenerNode.isPresent());
103 synchronized (listener) {
104 notifyInitialState(path, listener);
106 return listenerNode.get().registerDataChangeListener(listener, scope);
109 private void notifyInitialState(final InstanceIdentifier path,
110 final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
111 Optional<StoreMetadataNode> currentState = snapshot.read(path);
113 if (currentState.isPresent()) {
114 NormalizedNode<?, ?> data = currentState.get().getData();
115 listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
117 .addCreated(path, data) //
121 } catch (Exception e) {
122 LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
127 private synchronized DOMStoreThreePhaseCommitCohort submit(
128 final SnaphostBackedWriteTransaction snaphostBackedWriteTransaction) {
129 return new ThreePhaseCommitImpl(snaphostBackedWriteTransaction);
132 private Object nextIdentifier() {
133 return name + "-" + txCounter.getAndIncrement();
136 private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
137 final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
138 LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
139 checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
140 snapshot = DataAndMetadataSnapshot.builder() //
141 .setMetadataTree(newDataTree) //
142 .setSchemaContext(schemaContext) //
145 for(ChangeListenerNotifyTask task : listenerTasks) {
146 executor.submit(task);
151 private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
153 private DataAndMetadataSnapshot stableSnapshot;
154 private final Object identifier;
156 public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
157 this.identifier = identifier;
158 this.stableSnapshot = snapshot;
162 public Object getIdentifier() {
167 public void close() {
168 stableSnapshot = null;
172 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
173 checkNotNull(path, "Path must not be null.");
174 checkState(stableSnapshot != null, "Transaction is closed");
175 return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
179 public String toString() {
180 return "SnapshotBackedReadTransaction [id =" + identifier + "]";
185 private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction {
187 private MutableDataTree mutableTree;
188 private final Object identifier;
189 private InMemoryDOMDataStore store;
191 private boolean ready = false;
193 public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
194 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
195 this.identifier = identifier;
196 mutableTree = MutableDataTree.from(snapshot, applyOper);
201 public Object getIdentifier() {
206 public void close() {
207 this.mutableTree = null;
212 public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
214 mutableTree.write(path, data);
218 public void delete(final InstanceIdentifier path) {
220 mutableTree.delete(path);
223 protected boolean isReady() {
227 protected void checkNotReady() {
228 checkState(!ready, "Transaction is ready. No further modifications allowed.");
232 public synchronized DOMStoreThreePhaseCommitCohort ready() {
234 LOG.debug("Store transaction: {} : Ready", getIdentifier());
236 return store.submit(this);
239 protected MutableDataTree getMutatedView() {
244 public String toString() {
245 return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
250 private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
251 DOMStoreReadWriteTransaction {
253 protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
254 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
255 super(identifier, snapshot, store, applyOper);
259 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
260 return Futures.immediateFuture(getMutatedView().read(path));
264 public String toString() {
265 return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
270 private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
272 private final SnaphostBackedWriteTransaction transaction;
273 private final NodeModification modification;
275 private DataAndMetadataSnapshot storeSnapshot;
276 private Optional<StoreMetadataNode> proposedSubtree;
277 private Iterable<ChangeListenerNotifyTask> listenerTasks;
279 public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
280 this.transaction = writeTransaction;
281 this.modification = transaction.getMutatedView().getRootModification();
285 public ListenableFuture<Boolean> canCommit() {
286 final DataAndMetadataSnapshot snapshotCapture = snapshot;
287 final ModificationApplyOperation snapshotOperation = operationTree;
289 return executor.submit(new Callable<Boolean>() {
292 public Boolean call() throws Exception {
293 boolean applicable = snapshotOperation.isApplicable(modification,
294 Optional.of(snapshotCapture.getMetadataTree()));
295 LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
302 public ListenableFuture<Void> preCommit() {
303 storeSnapshot = snapshot;
304 return executor.submit(new Callable<Void>() {
309 public Void call() throws Exception {
310 StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
312 proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
313 increase(metadataTree.getSubtreeVersion()));
316 listenerTasks = DataChangeEventResolver.create() //
317 .setRootPath(PUBLIC_ROOT_PATH) //
318 .setBeforeRoot(Optional.of(metadataTree)) //
319 .setAfterRoot(proposedSubtree) //
320 .setModificationRoot(modification) //
321 .setListenerRoot(listenerTree) //
330 public ListenableFuture<Void> abort() {
331 storeSnapshot = null;
332 proposedSubtree = null;
333 return Futures.<Void> immediateFuture(null);
337 public ListenableFuture<Void> commit() {
338 checkState(proposedSubtree != null);
339 checkState(storeSnapshot != null);
340 // return ImmediateFuture<>;
341 InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
342 return Futures.<Void> immediateFuture(null);
347 private class AllwaysFailOperation implements ModificationApplyOperation {
350 public Optional<StoreMetadataNode> apply(final NodeModification modification,
351 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
352 throw new IllegalStateException("Schema Context is not available.");
356 public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
357 throw new IllegalStateException("Schema Context is not available.");
361 public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
362 throw new IllegalStateException("Schema Context is not available.");
366 public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
367 throw new IllegalStateException("Schema Context is not available.");