2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.dom.store.impl;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.atomic.AtomicLong;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
19 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
20 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
28 import org.opendaylight.yangtools.concepts.Identifiable;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import com.google.common.base.Optional;
40 import com.google.common.base.Preconditions;
41 import com.google.common.primitives.UnsignedLong;
42 import com.google.common.util.concurrent.Futures;
43 import com.google.common.util.concurrent.ListenableFuture;
44 import com.google.common.util.concurrent.ListeningExecutorService;
46 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
48 private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
49 private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
52 private final ListeningExecutorService executor;
53 private final String name;
54 private final AtomicLong txCounter = new AtomicLong(0);
56 private DataAndMetadataSnapshot snapshot;
57 private ModificationApplyOperation operationTree;
58 private final ListenerRegistrationNode listenerTree;
62 private SchemaContext schemaContext;
64 public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
65 this.name = Preconditions.checkNotNull(name);
66 this.executor = Preconditions.checkNotNull(executor);
67 this.operationTree = new AlwaysFailOperation();
68 this.snapshot = DataAndMetadataSnapshot.createEmpty();
69 this.listenerTree = ListenerRegistrationNode.createRoot();
73 public final String getIdentifier() {
78 public DOMStoreReadTransaction newReadOnlyTransaction() {
79 return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
83 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
84 return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
88 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
89 return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
93 public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
94 operationTree = SchemaAwareApplyOperationRoot.from(ctx);
99 public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
100 final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
101 LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
102 ListenerRegistrationNode listenerNode = listenerTree;
103 for(PathArgument arg :path.getPath()) {
104 listenerNode = listenerNode.ensureChild(arg);
106 synchronized (listener) {
107 notifyInitialState(path, listener);
109 return listenerNode.registerDataChangeListener(path,listener, scope);
112 private void notifyInitialState(final InstanceIdentifier path,
113 final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
114 Optional<StoreMetadataNode> currentState = snapshot.read(path);
116 if (currentState.isPresent()) {
117 NormalizedNode<?, ?> data = currentState.get().getData();
118 listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
120 .addCreated(path, data) //
124 } catch (Exception e) {
125 LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
130 private synchronized DOMStoreThreePhaseCommitCohort submit(
131 final SnaphostBackedWriteTransaction writeTx) {
132 LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
133 return new ThreePhaseCommitImpl(writeTx);
136 private Object nextIdentifier() {
137 return name + "-" + txCounter.getAndIncrement();
140 private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
141 final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
142 LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
144 if(LOG.isTraceEnabled()) {
145 LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
147 checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
148 snapshot = DataAndMetadataSnapshot.builder() //
149 .setMetadataTree(newDataTree) //
150 .setSchemaContext(schemaContext) //
153 for(ChangeListenerNotifyTask task : listenerTasks) {
154 executor.submit(task);
159 private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
161 private DataAndMetadataSnapshot stableSnapshot;
162 private final Object identifier;
164 public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
165 this.identifier = identifier;
166 this.stableSnapshot = snapshot;
167 LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
172 public Object getIdentifier() {
177 public void close() {
178 stableSnapshot = null;
182 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
183 checkNotNull(path, "Path must not be null.");
184 checkState(stableSnapshot != null, "Transaction is closed");
185 return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
189 public String toString() {
190 return "SnapshotBackedReadTransaction [id =" + identifier + "]";
195 private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction {
197 private MutableDataTree mutableTree;
198 private final Object identifier;
199 private InMemoryDOMDataStore store;
201 private boolean ready = false;
203 public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
204 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
205 this.identifier = identifier;
206 mutableTree = MutableDataTree.from(snapshot, applyOper);
208 LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
212 public Object getIdentifier() {
217 public void close() {
218 this.mutableTree = null;
223 public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
225 mutableTree.write(path, data);
229 public void delete(final InstanceIdentifier path) {
231 mutableTree.delete(path);
234 protected boolean isReady() {
238 protected void checkNotReady() {
239 checkState(!ready, "Transaction is ready. No further modifications allowed.");
243 public synchronized DOMStoreThreePhaseCommitCohort ready() {
245 LOG.debug("Store transaction: {} : Ready", getIdentifier());
247 return store.submit(this);
250 protected MutableDataTree getMutatedView() {
255 public String toString() {
256 return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
261 private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
262 DOMStoreReadWriteTransaction {
264 protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
265 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
266 super(identifier, snapshot, store, applyOper);
270 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
271 return Futures.immediateFuture(getMutatedView().read(path));
275 public String toString() {
276 return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
281 private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
283 private final SnaphostBackedWriteTransaction transaction;
284 private final NodeModification modification;
286 private DataAndMetadataSnapshot storeSnapshot;
287 private Optional<StoreMetadataNode> proposedSubtree;
288 private Iterable<ChangeListenerNotifyTask> listenerTasks;
290 public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
291 this.transaction = writeTransaction;
292 this.modification = transaction.getMutatedView().getRootModification();
296 public ListenableFuture<Boolean> canCommit() {
297 final DataAndMetadataSnapshot snapshotCapture = snapshot;
298 final ModificationApplyOperation snapshotOperation = operationTree;
300 return executor.submit(new Callable<Boolean>() {
303 public Boolean call() throws Exception {
304 boolean applicable = snapshotOperation.isApplicable(modification,
305 Optional.of(snapshotCapture.getMetadataTree()));
306 LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
313 public ListenableFuture<Void> preCommit() {
314 storeSnapshot = snapshot;
315 if(modification.getModificationType() == ModificationType.UNMODIFIED) {
316 return Futures.immediateFuture(null);
318 return executor.submit(new Callable<Void>() {
323 public Void call() throws Exception {
324 StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
326 proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
327 increase(metadataTree.getSubtreeVersion()));
329 listenerTasks = DataChangeEventResolver.create() //
330 .setRootPath(PUBLIC_ROOT_PATH) //
331 .setBeforeRoot(Optional.of(metadataTree)) //
332 .setAfterRoot(proposedSubtree) //
333 .setModificationRoot(modification) //
334 .setListenerRoot(listenerTree) //
343 public ListenableFuture<Void> abort() {
344 storeSnapshot = null;
345 proposedSubtree = null;
346 return Futures.<Void> immediateFuture(null);
350 public ListenableFuture<Void> commit() {
351 if(modification.getModificationType() == ModificationType.UNMODIFIED) {
352 return Futures.immediateFuture(null);
355 checkState(proposedSubtree != null,"Proposed subtree must be computed");
356 checkState(storeSnapshot != null,"Proposed subtree must be computed");
357 // return ImmediateFuture<>;
358 InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
359 return Futures.<Void> immediateFuture(null);
364 private static final class AlwaysFailOperation implements ModificationApplyOperation {
367 public Optional<StoreMetadataNode> apply(final NodeModification modification,
368 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
369 throw new IllegalStateException("Schema Context is not available.");
373 public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
374 throw new IllegalStateException("Schema Context is not available.");
378 public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
379 throw new IllegalStateException("Schema Context is not available.");
383 public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
384 throw new IllegalStateException("Schema Context is not available.");