2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.dom.store.impl;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
14 import java.util.Collections;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.atomic.AtomicLong;
17 import java.util.concurrent.atomic.AtomicReference;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
24 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
25 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
33 import org.opendaylight.yangtools.concepts.Identifiable;
34 import org.opendaylight.yangtools.concepts.ListenerRegistration;
35 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
36 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
37 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
38 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import com.google.common.base.Objects;
45 import com.google.common.base.Objects.ToStringHelper;
46 import com.google.common.base.Optional;
47 import com.google.common.base.Preconditions;
48 import com.google.common.primitives.UnsignedLong;
49 import com.google.common.util.concurrent.Futures;
50 import com.google.common.util.concurrent.ListenableFuture;
51 import com.google.common.util.concurrent.ListeningExecutorService;
53 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
55 private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
56 private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
59 private final ListeningExecutorService executor;
60 private final String name;
61 private final AtomicLong txCounter = new AtomicLong(0);
62 private final ListenerRegistrationNode listenerTree;
63 private final AtomicReference<DataAndMetadataSnapshot> snapshot;
65 private ModificationApplyOperation operationTree;
67 private SchemaContext schemaContext;
69 public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
70 this.name = Preconditions.checkNotNull(name);
71 this.executor = Preconditions.checkNotNull(executor);
72 this.listenerTree = ListenerRegistrationNode.createRoot();
73 this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
74 this.operationTree = new AlwaysFailOperation();
78 public final String getIdentifier() {
83 public DOMStoreReadTransaction newReadOnlyTransaction() {
84 return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
88 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
89 return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
93 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
94 return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
98 public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
99 operationTree = SchemaAwareApplyOperationRoot.from(ctx);
104 public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
105 final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
108 * Make sure commit is not occurring right now. Listener has to be registered and its
109 * state capture enqueued at a consistent point.
111 * FIXME: improve this to read-write lock, such that multiple listener registrations
112 * can occur simultaneously
114 final DataChangeListenerRegistration<L> reg;
115 synchronized (this) {
116 LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
117 ListenerRegistrationNode listenerNode = listenerTree;
118 for(PathArgument arg : path.getPath()) {
119 listenerNode = listenerNode.ensureChild(arg);
122 reg = listenerNode.registerDataChangeListener(path, listener, scope);
124 Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
125 if (currentState.isPresent()) {
126 final NormalizedNode<?, ?> data = currentState.get().getData();
128 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
130 .addCreated(path, data) //
132 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
136 return new AbstractListenerRegistration<L>(listener) {
138 protected void removeRegistration() {
139 synchronized (InMemoryDOMDataStore.this) {
146 private synchronized DOMStoreThreePhaseCommitCohort submit(
147 final SnaphostBackedWriteTransaction writeTx) {
148 LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
149 return new ThreePhaseCommitImpl(writeTx);
152 private Object nextIdentifier() {
153 return name + "-" + txCounter.getAndIncrement();
156 private void commit(final DataAndMetadataSnapshot currentSnapshot,
157 final StoreMetadataNode newDataTree, final DataChangeEventResolver listenerResolver) {
158 LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
160 if(LOG.isTraceEnabled()) {
161 LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
164 final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
165 .setMetadataTree(newDataTree) //
166 .setSchemaContext(schemaContext) //
170 * The commit has to occur atomically with regard to listener registrations.
172 synchronized (this) {
173 final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
174 checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
176 for (ChangeListenerNotifyTask task : listenerResolver.resolve()) {
177 executor.submit(task);
182 private static abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
183 private final Object identifier;
185 protected AbstractDOMStoreTransaction(final Object identifier) {
186 this.identifier = identifier;
190 public final Object getIdentifier() {
195 public final String toString() {
196 return addToStringAttributes(Objects.toStringHelper(this)).toString();
200 * Add class-specific toString attributes.
202 * @param toStringHelper ToStringHelper instance
203 * @return ToStringHelper instance which was passed in
205 protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
206 return toStringHelper.add("id", identifier);
210 private static class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements DOMStoreReadTransaction {
211 private DataAndMetadataSnapshot stableSnapshot;
213 public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
215 this.stableSnapshot = Preconditions.checkNotNull(snapshot);
216 LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot.getMetadataTree().getSubtreeVersion());
220 public void close() {
221 LOG.debug("Store transaction: {} : Closed", getIdentifier());
222 stableSnapshot = null;
226 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
227 checkNotNull(path, "Path must not be null.");
228 checkState(stableSnapshot != null, "Transaction is closed");
229 return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
233 private static class SnaphostBackedWriteTransaction extends AbstractDOMStoreTransaction implements DOMStoreWriteTransaction {
234 private MutableDataTree mutableTree;
235 private InMemoryDOMDataStore store;
236 private boolean ready = false;
238 public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
239 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
241 mutableTree = MutableDataTree.from(snapshot, applyOper);
243 LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
247 public void close() {
248 LOG.debug("Store transaction: {} : Closed", getIdentifier());
249 this.mutableTree = null;
254 public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
256 mutableTree.write(path, data);
260 public void delete(final InstanceIdentifier path) {
262 mutableTree.delete(path);
265 protected final boolean isReady() {
269 protected final void checkNotReady() {
270 checkState(!ready, "Transaction %s is ready. No further modifications allowed.", getIdentifier());
274 public synchronized DOMStoreThreePhaseCommitCohort ready() {
275 checkState(!ready, "Transaction %s is already ready.", getIdentifier());
278 LOG.debug("Store transaction: {} : Ready", getIdentifier());
280 return store.submit(this);
283 protected MutableDataTree getMutatedView() {
288 protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
289 return toStringHelper.add("ready", isReady());
293 private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
294 DOMStoreReadWriteTransaction {
296 protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
297 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
298 super(identifier, snapshot, store, applyOper);
302 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
303 return Futures.immediateFuture(getMutatedView().read(path));
307 private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
309 private final SnaphostBackedWriteTransaction transaction;
310 private final NodeModification modification;
312 private DataAndMetadataSnapshot storeSnapshot;
313 private Optional<StoreMetadataNode> proposedSubtree;
314 private DataChangeEventResolver listenerResolver;
316 public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
317 this.transaction = writeTransaction;
318 this.modification = transaction.getMutatedView().getRootModification();
322 public ListenableFuture<Boolean> canCommit() {
323 final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
324 final ModificationApplyOperation snapshotOperation = operationTree;
326 return executor.submit(new Callable<Boolean>() {
329 public Boolean call() throws Exception {
330 boolean applicable = snapshotOperation.isApplicable(modification,
331 Optional.of(snapshotCapture.getMetadataTree()));
332 LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
339 public ListenableFuture<Void> preCommit() {
340 storeSnapshot = snapshot.get();
341 if(modification.getModificationType() == ModificationType.UNMODIFIED) {
342 return Futures.immediateFuture(null);
344 return executor.submit(new Callable<Void>() {
349 public Void call() throws Exception {
350 StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
352 proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
353 increase(metadataTree.getSubtreeVersion()));
355 listenerResolver = DataChangeEventResolver.create() //
356 .setRootPath(PUBLIC_ROOT_PATH) //
357 .setBeforeRoot(Optional.of(metadataTree)) //
358 .setAfterRoot(proposedSubtree) //
359 .setModificationRoot(modification) //
360 .setListenerRoot(listenerTree);
368 public ListenableFuture<Void> abort() {
369 storeSnapshot = null;
370 proposedSubtree = null;
371 return Futures.<Void> immediateFuture(null);
375 public ListenableFuture<Void> commit() {
376 if(modification.getModificationType() == ModificationType.UNMODIFIED) {
377 return Futures.immediateFuture(null);
380 checkState(proposedSubtree != null,"Proposed subtree must be computed");
381 checkState(storeSnapshot != null,"Proposed subtree must be computed");
382 // return ImmediateFuture<>;
383 InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerResolver);
384 return Futures.<Void> immediateFuture(null);
389 private static final class AlwaysFailOperation implements ModificationApplyOperation {
392 public Optional<StoreMetadataNode> apply(final NodeModification modification,
393 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
394 throw new IllegalStateException("Schema Context is not available.");
398 public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
399 throw new IllegalStateException("Schema Context is not available.");
403 public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
404 throw new IllegalStateException("Schema Context is not available.");
408 public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
409 throw new IllegalStateException("Schema Context is not available.");