2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.dom.store.impl;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
14 import java.util.Collections;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.atomic.AtomicLong;
17 import java.util.concurrent.atomic.AtomicReference;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerRegistrationNode.DataChangeListenerRegistration;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
24 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
25 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 import com.google.common.base.Objects;
44 import com.google.common.base.Objects.ToStringHelper;
45 import com.google.common.base.Optional;
46 import com.google.common.base.Preconditions;
47 import com.google.common.primitives.UnsignedLong;
48 import com.google.common.util.concurrent.Futures;
49 import com.google.common.util.concurrent.ListenableFuture;
50 import com.google.common.util.concurrent.ListeningExecutorService;
52 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
54 private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
55 private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
58 private final ListeningExecutorService executor;
59 private final String name;
60 private final AtomicLong txCounter = new AtomicLong(0);
61 private final ListenerRegistrationNode listenerTree;
62 private final AtomicReference<DataAndMetadataSnapshot> snapshot;
64 private ModificationApplyOperation operationTree;
66 private SchemaContext schemaContext;
68 public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
69 this.name = Preconditions.checkNotNull(name);
70 this.executor = Preconditions.checkNotNull(executor);
71 this.listenerTree = ListenerRegistrationNode.createRoot();
72 this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
73 this.operationTree = new AlwaysFailOperation();
77 public final String getIdentifier() {
82 public DOMStoreReadTransaction newReadOnlyTransaction() {
83 return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
87 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
88 return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
92 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
93 return new SnaphostBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
97 public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
98 operationTree = SchemaAwareApplyOperationRoot.from(ctx);
103 public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
104 final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
105 LOG.debug("{}: Registering data change listener {} for {}",name,listener,path);
106 ListenerRegistrationNode listenerNode = listenerTree;
107 for(PathArgument arg : path.getPath()) {
108 listenerNode = listenerNode.ensureChild(arg);
112 * Make sure commit is not occurring right now. Listener has to be registered and its
113 * state capture enqueued at a consistent point.
115 * FIXME: improve this to read-write lock, such that multiple listener registrations
116 * can occur simultaneously
118 final DataChangeListenerRegistration<L> reg;
119 synchronized (this) {
120 reg = listenerNode.registerDataChangeListener(path, listener, scope);
122 Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
123 if (currentState.isPresent()) {
124 final NormalizedNode<?, ?> data = currentState.get().getData();
126 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder() //
128 .addCreated(path, data) //
130 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
137 private synchronized DOMStoreThreePhaseCommitCohort submit(
138 final SnaphostBackedWriteTransaction writeTx) {
139 LOG.debug("Tx: {} is submitted. Modifications: {}",writeTx.getIdentifier(),writeTx.getMutatedView());
140 return new ThreePhaseCommitImpl(writeTx);
143 private Object nextIdentifier() {
144 return name + "-" + txCounter.getAndIncrement();
147 private void commit(final DataAndMetadataSnapshot currentSnapshot,
148 final StoreMetadataNode newDataTree, final Iterable<ChangeListenerNotifyTask> listenerTasks) {
149 LOG.debug("Updating Store snaphot version: {} with version:{}",currentSnapshot.getMetadataTree().getSubtreeVersion(),newDataTree.getSubtreeVersion());
151 if(LOG.isTraceEnabled()) {
152 LOG.trace("Data Tree is {}",StoreUtils.toStringTree(newDataTree));
155 final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
156 .setMetadataTree(newDataTree) //
157 .setSchemaContext(schemaContext) //
161 * The commit has to occur atomically with regard to listener registrations.
163 synchronized (this) {
164 final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
165 checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
167 for (ChangeListenerNotifyTask task : listenerTasks) {
168 executor.submit(task);
173 private static abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
174 private final Object identifier;
176 protected AbstractDOMStoreTransaction(final Object identifier) {
177 this.identifier = identifier;
181 public final Object getIdentifier() {
186 public final String toString() {
187 return addToStringAttributes(Objects.toStringHelper(this)).toString();
191 * Add class-specific toString attributes.
193 * @param toStringHelper ToStringHelper instance
194 * @return ToStringHelper instance which was passed in
196 protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
197 return toStringHelper.add("id", identifier);
201 private static class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements DOMStoreReadTransaction {
202 private DataAndMetadataSnapshot stableSnapshot;
204 public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
206 this.stableSnapshot = Preconditions.checkNotNull(snapshot);
207 LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot.getMetadataTree().getSubtreeVersion());
211 public void close() {
212 LOG.debug("Store transaction: {} : Closed", getIdentifier());
213 stableSnapshot = null;
217 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
218 checkNotNull(path, "Path must not be null.");
219 checkState(stableSnapshot != null, "Transaction is closed");
220 return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
224 private static class SnaphostBackedWriteTransaction extends AbstractDOMStoreTransaction implements DOMStoreWriteTransaction {
225 private MutableDataTree mutableTree;
226 private InMemoryDOMDataStore store;
227 private boolean ready = false;
229 public SnaphostBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
230 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
232 mutableTree = MutableDataTree.from(snapshot, applyOper);
234 LOG.debug("Write Tx: {} allocated with snapshot {}",identifier,snapshot.getMetadataTree().getSubtreeVersion());
238 public void close() {
239 LOG.debug("Store transaction: {} : Closed", getIdentifier());
240 this.mutableTree = null;
245 public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
247 mutableTree.write(path, data);
251 public void delete(final InstanceIdentifier path) {
253 mutableTree.delete(path);
256 protected final boolean isReady() {
260 protected final void checkNotReady() {
261 checkState(!ready, "Transaction %s is ready. No further modifications allowed.", getIdentifier());
265 public synchronized DOMStoreThreePhaseCommitCohort ready() {
266 checkState(!ready, "Transaction %s is already ready.", getIdentifier());
269 LOG.debug("Store transaction: {} : Ready", getIdentifier());
271 return store.submit(this);
274 protected MutableDataTree getMutatedView() {
279 protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
280 return toStringHelper.add("ready", isReady());
284 private static class SnapshotBackedReadWriteTransaction extends SnaphostBackedWriteTransaction implements
285 DOMStoreReadWriteTransaction {
287 protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
288 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
289 super(identifier, snapshot, store, applyOper);
293 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
294 return Futures.immediateFuture(getMutatedView().read(path));
298 private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
300 private final SnaphostBackedWriteTransaction transaction;
301 private final NodeModification modification;
303 private DataAndMetadataSnapshot storeSnapshot;
304 private Optional<StoreMetadataNode> proposedSubtree;
305 private Iterable<ChangeListenerNotifyTask> listenerTasks;
307 public ThreePhaseCommitImpl(final SnaphostBackedWriteTransaction writeTransaction) {
308 this.transaction = writeTransaction;
309 this.modification = transaction.getMutatedView().getRootModification();
313 public ListenableFuture<Boolean> canCommit() {
314 final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
315 final ModificationApplyOperation snapshotOperation = operationTree;
317 return executor.submit(new Callable<Boolean>() {
320 public Boolean call() throws Exception {
321 boolean applicable = snapshotOperation.isApplicable(modification,
322 Optional.of(snapshotCapture.getMetadataTree()));
323 LOG.debug("Store Transcation: {} : canCommit : {}", transaction.getIdentifier(), applicable);
330 public ListenableFuture<Void> preCommit() {
331 storeSnapshot = snapshot.get();
332 if(modification.getModificationType() == ModificationType.UNMODIFIED) {
333 return Futures.immediateFuture(null);
335 return executor.submit(new Callable<Void>() {
340 public Void call() throws Exception {
341 StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
343 proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
344 increase(metadataTree.getSubtreeVersion()));
346 listenerTasks = DataChangeEventResolver.create() //
347 .setRootPath(PUBLIC_ROOT_PATH) //
348 .setBeforeRoot(Optional.of(metadataTree)) //
349 .setAfterRoot(proposedSubtree) //
350 .setModificationRoot(modification) //
351 .setListenerRoot(listenerTree) //
360 public ListenableFuture<Void> abort() {
361 storeSnapshot = null;
362 proposedSubtree = null;
363 return Futures.<Void> immediateFuture(null);
367 public ListenableFuture<Void> commit() {
368 if(modification.getModificationType() == ModificationType.UNMODIFIED) {
369 return Futures.immediateFuture(null);
372 checkState(proposedSubtree != null,"Proposed subtree must be computed");
373 checkState(storeSnapshot != null,"Proposed subtree must be computed");
374 // return ImmediateFuture<>;
375 InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(),listenerTasks);
376 return Futures.<Void> immediateFuture(null);
381 private static final class AlwaysFailOperation implements ModificationApplyOperation {
384 public Optional<StoreMetadataNode> apply(final NodeModification modification,
385 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
386 throw new IllegalStateException("Schema Context is not available.");
390 public boolean isApplicable(final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
391 throw new IllegalStateException("Schema Context is not available.");
395 public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
396 throw new IllegalStateException("Schema Context is not available.");
400 public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
401 throw new IllegalStateException("Schema Context is not available.");