2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.dom.store.impl;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.atomic.AtomicLong;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
18 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
19 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
20 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
23 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
24 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
28 import org.opendaylight.yangtools.concepts.Identifiable;
29 import org.opendaylight.yangtools.concepts.ListenerRegistration;
30 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
31 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
32 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
33 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
39 import com.google.common.base.Optional;
40 import com.google.common.primitives.UnsignedLong;
41 import com.google.common.util.concurrent.Futures;
42 import com.google.common.util.concurrent.ListenableFuture;
43 import com.google.common.util.concurrent.ListeningExecutorService;
45 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
47 private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
48 private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
51 private final ListeningExecutorService executor;
52 private final String name;
53 private final AtomicLong txCounter = new AtomicLong(0);
55 private DataAndMetadataSnapshot snapshot;
56 private ModificationApplyOperation operationTree;
57 private final ListenerRegistrationNode listenerTree;
61 private SchemaContext schemaContext;
63 public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
64 this.executor = executor;
66 this.operationTree = new AllwaysFailOperation();
67 this.snapshot = DataAndMetadataSnapshot.createEmpty();
68 this.listenerTree = ListenerRegistrationNode.createRoot();
72 public String getIdentifier() {
77 public DOMStoreReadTransaction newReadOnlyTransaction() {
78 return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
82 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
83 return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
87 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
88 return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot, this, operationTree);
92 public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
93 operationTree = SchemaAwareApplyOperationRoot.from(ctx);
98 public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
99 final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
100 LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
101 ListenerRegistrationNode listenerNode = listenerTree;
102 for(PathArgument arg :path.getPath()) {
103 listenerNode = listenerNode.ensureChild(arg);
105 synchronized (listener) {
106 notifyInitialState(path, listener);
108 return listenerNode.registerDataChangeListener(path,listener, scope);
111 private void notifyInitialState(final InstanceIdentifier path,
112 final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener) {
113 Optional<StoreMetadataNode> currentState = snapshot.read(path);
115 if (currentState.isPresent()) {
116 NormalizedNode<?, ?> data = currentState.get().getData();
117 listener.onDataChanged(DOMImmutableDataChangeEvent.builder() //
119 .addCreated(path, data) //
123 } catch (Exception e) {
124 LOG.error("Unhandled exception encountered when invoking listener {}", listener, e);
129 private synchronized DOMStoreThreePhaseCommitCohort submit(
130 final SnaphostBackedWriteTransaction writeTx) {
131 LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
132 return new ThreePhaseCommitImpl(writeTx);
135 private Object nextIdentifier() {
136 return name + "-" + txCounter.getAndIncrement();
139 private synchronized void commit(final DataAndMetadataSnapshot currentSnapshot,
140 final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
141 LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
143 if(LOG.isTraceEnabled()) {
144 LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
146 checkState(snapshot == currentSnapshot, "Store snapshot and transaction snapshot differs");
147 snapshot = DataAndMetadataSnapshot.builder() //
148 .setMetadataTree(newDataTree) //
149 .setSchemaContext(schemaContext) //
152 for(ChangeListenerNotifyTask task : listenerTasks) {
153 executor.submit(task);
158 private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
160 private DataAndMetadataSnapshot stableSnapshot;
161 private final Object identifier;
163 public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
164 this.identifier = identifier;
165 this.stableSnapshot = snapshot;
166 LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
171 public Object getIdentifier() {
176 public void close() {
177 stableSnapshot = null;
181 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
182 checkNotNull(path, "Path must not be null.");
183 checkState(stableSnapshot != null, "Transaction is closed");
184 return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
188 public String toString() {
189 return "SnapshotBackedReadTransaction [id =" + identifier + "]";
194 private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction {
196 private MutableDataTree mutableTree;
197 private final Object identifier;
198 private InMemoryDOMDataStore store;
200 private boolean ready = false;
202 public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
203 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
204 this.identifier = identifier;
205 mutableTree = MutableDataTree.from(snapshot, applyOper);
207 LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
211 public Object getIdentifier() {
216 public void close() {
217 this.mutableTree = null;
222 public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
224 mutableTree.write(path, data);
228 public void delete(final InstanceIdentifier path) {
230 mutableTree.delete(path);
233 protected boolean isReady() {
237 protected void checkNotReady() {
238 checkState(!ready, "Transaction is ready. No further modifications allowed.");
242 public synchronized DOMStoreThreePhaseCommitCohort ready() {
244 LOG.debug("Store transaction: {} : Ready", getIdentifier());
246 return store.submit(this);
249 protected MutableDataTree getMutatedView() {
254 public String toString() {
255 return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
260 private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
261 DOMStoreReadWriteTransaction {
263 protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
264 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
265 super(identifier, snapshot, store, applyOper);
269 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
270 return Futures.immediateFuture(getMutatedView().read(path));
274 public String toString() {
275 return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
280 private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
282 private final SnaphostBackedWriteTransaction transaction;
283 private final NodeModification modification;
285 private DataAndMetadataSnapshot storeSnapshot;
286 private Optional<StoreMetadataNode> proposedSubtree;
287 private Iterable<ChangeListenerNotifyTask> listenerTasks;
289 public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
290 this.transaction = writeTransaction;
291 this.modification = transaction.getMutatedView().getRootModification();
295 public ListenableFuture<Boolean> canCommit() {
296 final DataAndMetadataSnapshot snapshotCapture = snapshot;
297 final ModificationApplyOperation snapshotOperation = operationTree;
299 return executor.submit(new Callable<Boolean>() {
302 public Boolean call() throws Exception {
303 boolean applicable = snapshotOperation.isApplicable(modification,
304 Optional.of(snapshotCapture.getMetadataTree()));
305 LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
312 public ListenableFuture<Void> preCommit() {
313 storeSnapshot = snapshot;
314 if(modification.getModificationType() == ModificationType.UNMODIFIED) {
315 return Futures.immediateFuture(null);
317 return executor.submit(new Callable<Void>() {
322 public Void call() throws Exception {
323 StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
325 proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
326 increase(metadataTree.getSubtreeVersion()));
328 listenerTasks = DataChangeEventResolver.create() //
329 .setRootPath(PUBLIC_ROOT_PATH) //
330 .setBeforeRoot(Optional.of(metadataTree)) //
331 .setAfterRoot(proposedSubtree) //
332 .setModificationRoot(modification) //
333 .setListenerRoot(listenerTree) //
342 public ListenableFuture<Void> abort() {
343 storeSnapshot = null;
344 proposedSubtree = null;
345 return Futures.<Void> immediateFuture(null);
349 public ListenableFuture<Void> commit() {
350 if(modification.getModificationType() == ModificationType.UNMODIFIED) {
351 return Futures.immediateFuture(null);
354 checkState(proposedSubtree != null,"Proposed subtree must be computed");
355 checkState(storeSnapshot != null,"Proposed subtree must be computed");
356 // return ImmediateFuture<>;
357 InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
358 return Futures.<Void> immediateFuture(null);
363 private class AllwaysFailOperation implements ModificationApplyOperation {
366 public Optional<StoreMetadataNode> apply(final NodeModification modification,
367 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
368 throw new IllegalStateException("Schema Context is not available.");
372 public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
373 throw new IllegalStateException("Schema Context is not available.");
377 public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
378 throw new IllegalStateException("Schema Context is not available.");
382 public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
383 throw new IllegalStateException("Schema Context is not available.");