2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.dom.store.impl;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
14 import java.util.Collections;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.atomic.AtomicLong;
17 import java.util.concurrent.atomic.AtomicReference;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
24 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
25 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
31 import org.opendaylight.yangtools.concepts.Identifiable;
32 import org.opendaylight.yangtools.concepts.ListenerRegistration;
33 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
35 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
36 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
37 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import com.google.common.base.Optional;
43 import com.google.common.base.Preconditions;
44 import com.google.common.primitives.UnsignedLong;
45 import com.google.common.util.concurrent.Futures;
46 import com.google.common.util.concurrent.ListenableFuture;
47 import com.google.common.util.concurrent.ListeningExecutorService;
49 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
51 private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
52 private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
55 private final ListeningExecutorService executor;
56 private final String name;
57 private final AtomicLong txCounter = new AtomicLong(0);
58 private final ListenerRegistrationNode listenerTree;
59 private final AtomicReference<DataAndMetadataSnapshot> snapshot;
61 private ModificationApplyOperation operationTree;
63 private SchemaContext schemaContext;
65 public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
66 this.name = Preconditions.checkNotNull(name);
67 this.executor = Preconditions.checkNotNull(executor);
68 this.listenerTree = ListenerRegistrationNode.createRoot();
69 this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
70 this.operationTree = new AlwaysFailOperation();
74 public final String getIdentifier() {
79 public DOMStoreReadTransaction newReadOnlyTransaction() {
80 return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
84 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
85 return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
89 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
90 return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
94 public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
95 operationTree = SchemaAwareApplyOperationRoot.from(ctx);
100 public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
101 final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
102 LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
103 ListenerRegistrationNode listenerNode = listenerTree;
104 for(PathArgument arg : path.getPath()) {
105 listenerNode = listenerNode.ensureChild(arg);
109 * Make sure commit is not occurring right now. Listener has to be registered and its
110 * state capture enqueued at a consistent point.
112 * FIXME: improve this to read-write lock, such that multiple listener registrations
113 * can occur simultaneously
115 final DataChangeListenerRegistration<L> reg;
116 synchronized (this) {
117 reg = listenerNode.registerDataChangeListener(path, listener, scope);
119 Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
120 if (currentState.isPresent()) {
121 final NormalizedNode<?, ?> data = currentState.get().getData();
123 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
125 .addCreated(path, data) //
127 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
134 private synchronized DOMStoreThreePhaseCommitCohort submit(
135 final SnaphostBackedWriteTransaction writeTx) {
136 LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
137 return new ThreePhaseCommitImpl(writeTx);
140 private Object nextIdentifier() {
141 return name + "-" + txCounter.getAndIncrement();
144 private void commit(final DataAndMetadataSnapshot currentSnapshot,
145 final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
146 LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
148 if(LOG.isTraceEnabled()) {
149 LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
152 final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
153 .setMetadataTree(newDataTree) //
154 .setSchemaContext(schemaContext) //
158 * The commit has to occur atomically with regard to listener registrations.
160 synchronized (this) {
161 final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
162 checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
164 for (ChangeListenerNotifyTask task : listenerTasks) {
165 executor.submit(task);
170 private static class SnapshotBackedReadTransaction implements DOMStoreReadTransaction {
172 private DataAndMetadataSnapshot stableSnapshot;
173 private final Object identifier;
175 public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
176 this.identifier = identifier;
177 this.stableSnapshot = snapshot;
178 LOG.debug("ReadOnly Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
183 public Object getIdentifier() {
188 public void close() {
189 stableSnapshot = null;
193 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
194 checkNotNull(path, "Path must not be null.");
195 checkState(stableSnapshot != null, "Transaction is closed");
196 return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
200 public String toString() {
201 return "SnapshotBackedReadTransaction [id =" + identifier + "]";
206 private static class SnaphostBackedWriteTransaction implements DOMStoreWriteTransaction {
208 private MutableDataTree mutableTree;
209 private final Object identifier;
210 private InMemoryDOMDataStore store;
212 private boolean ready = false;
214 public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
215 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
216 this.identifier = identifier;
217 mutableTree = MutableDataTree.from(snapshot, applyOper);
219 LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
223 public Object getIdentifier() {
228 public void close() {
229 this.mutableTree = null;
234 public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
236 mutableTree.write(path, data);
240 public void delete(final InstanceIdentifier path) {
242 mutableTree.delete(path);
245 protected boolean isReady() {
249 protected void checkNotReady() {
250 checkState(!ready, "Transaction is ready. No further modifications allowed.");
254 public synchronized DOMStoreThreePhaseCommitCohort ready() {
256 LOG.debug("Store transaction: {} : Ready", getIdentifier());
258 return store.submit(this);
261 protected MutableDataTree getMutatedView() {
266 public String toString() {
267 return "SnaphostBackedWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
272 private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
273 DOMStoreReadWriteTransaction {
275 protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
276 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
277 super(identifier, snapshot, store, applyOper);
281 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
282 return Futures.immediateFuture(getMutatedView().read(path));
286 public String toString() {
287 return "SnapshotBackedReadWriteTransaction [id=" + getIdentifier() + ", ready=" + isReady() + "]";
292 private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
294 private final SnaphostBackedWriteTransaction transaction;
295 private final NodeModification modification;
297 private DataAndMetadataSnapshot storeSnapshot;
298 private Optional<StoreMetadataNode> proposedSubtree;
299 private Iterable<ChangeListenerNotifyTask> listenerTasks;
301 public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
302 this.transaction = writeTransaction;
303 this.modification = transaction.getMutatedView().getRootModification();
307 public ListenableFuture<Boolean> canCommit() {
308 final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
309 final ModificationApplyOperation snapshotOperation = operationTree;
311 return executor.submit(new Callable<Boolean>() {
314 public Boolean call() throws Exception {
315 boolean applicable = snapshotOperation.isApplicable(modification,
316 Optional.of(snapshotCapture.getMetadataTree()));
317 LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
324 public ListenableFuture<Void> preCommit() {
325 storeSnapshot = snapshot.get();
326 if(modification.getModificationType() == ModificationType.UNMODIFIED) {
327 return Futures.immediateFuture(null);
329 return executor.submit(new Callable<Void>() {
334 public Void call() throws Exception {
335 StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
337 proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
338 increase(metadataTree.getSubtreeVersion()));
340 listenerTasks = DataChangeEventResolver.create() //
341 .setRootPath(PUBLIC_ROOT_PATH) //
342 .setBeforeRoot(Optional.of(metadataTree)) //
343 .setAfterRoot(proposedSubtree) //
344 .setModificationRoot(modification) //
345 .setListenerRoot(listenerTree) //
354 public ListenableFuture<Void> abort() {
355 storeSnapshot = null;
356 proposedSubtree = null;
357 return Futures.<Void> immediateFuture(null);
361 public ListenableFuture<Void> commit() {
362 if(modification.getModificationType() == ModificationType.UNMODIFIED) {
363 return Futures.immediateFuture(null);
366 checkState(proposedSubtree != null,"Proposed subtree must be computed");
367 checkState(storeSnapshot != null,"Proposed subtree must be computed");
368 // return ImmediateFuture<>;
369 InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
370 return Futures.<Void> immediateFuture(null);
375 private static final class AlwaysFailOperation implements ModificationApplyOperation {
378 public Optional<StoreMetadataNode> apply(final NodeModification modification,
379 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
380 throw new IllegalStateException("Schema Context is not available.");
384 public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
385 throw new IllegalStateException("Schema Context is not available.");
389 public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
390 throw new IllegalStateException("Schema Context is not available.");
394 public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
395 throw new IllegalStateException("Schema Context is not available.");