2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.dom.store.impl;
10 import com.google.common.base.Optional;
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.FutureCallback;
13 import com.google.common.util.concurrent.Futures;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
17 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
18 import org.opendaylight.controller.md.sal.common.api.data.OptimisticLockFailedException;
19 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
20 import org.opendaylight.controller.md.sal.dom.store.impl.SnapshotBackedWriteTransaction.TransactionReadyPrototype;
21 import org.opendaylight.yangtools.yang.data.api.schema.tree.ConflictingModificationAppliedException;
22 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
23 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
24 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
25 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
26 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
27 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
28 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
31 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
32 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
33 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
34 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
35 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
36 import org.opendaylight.yangtools.concepts.Identifiable;
37 import org.opendaylight.yangtools.concepts.ListenerRegistration;
38 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
39 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
40 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
41 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 import javax.annotation.concurrent.GuardedBy;
46 import java.util.Collections;
47 import java.util.concurrent.Callable;
48 import java.util.concurrent.atomic.AtomicLong;
50 import static com.google.common.base.Preconditions.checkState;
53 * In-memory DOM Data Store
55 * Implementation of {@link DOMStore} which uses {@link DataTree} and other
56 * classes such as {@link SnapshotBackedWriteTransaction}.
57 * {@link SnapshotBackedReadTransaction} and {@link ResolveDataChangeEventsTask}
58 * to implement {@link DOMStore} contract.
61 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener,
62 TransactionReadyPrototype,AutoCloseable {
63 private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
64 private final DataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
65 private final ListenerTree listenerTree = ListenerTree.create();
66 private final AtomicLong txCounter = new AtomicLong(0);
67 private final ListeningExecutorService executor;
69 private final String name;
71 public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
72 this.name = Preconditions.checkNotNull(name);
73 this.executor = Preconditions.checkNotNull(executor);
77 public final String getIdentifier() {
82 public DOMStoreReadTransaction newReadOnlyTransaction() {
83 return new SnapshotBackedReadTransaction(nextIdentifier(), dataTree.takeSnapshot());
87 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
88 return new SnapshotBackedReadWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this);
92 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
93 return new SnapshotBackedWriteTransaction(nextIdentifier(), dataTree.takeSnapshot(), this);
97 public DOMStoreTransactionChain createTransactionChain() {
98 return new DOMStoreTransactionChainImpl();
102 public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
103 dataTree.setSchemaContext(ctx);
108 executor.shutdownNow();
111 public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
112 final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
115 * Make sure commit is not occurring right now. Listener has to be
116 * registered and its state capture enqueued at a consistent point.
118 * FIXME: improve this to read-write lock, such that multiple listener
119 * registrations can occur simultaneously
121 final DataChangeListenerRegistration<L> reg;
122 synchronized (this) {
123 LOG.debug("{}: Registering data change listener {} for {}", name, listener, path);
125 reg = listenerTree.registerDataChangeListener(path, listener, scope);
127 Optional<NormalizedNode<?, ?>> currentState = dataTree.takeSnapshot().readNode(path);
128 if (currentState.isPresent()) {
129 final NormalizedNode<?, ?> data = currentState.get();
131 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) //
133 .addCreated(path, data) //
135 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
139 return new AbstractListenerRegistration<L>(listener) {
141 protected void removeRegistration() {
142 synchronized (InMemoryDOMDataStore.this) {
150 public synchronized DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction writeTx) {
151 LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView());
152 return new ThreePhaseCommitImpl(writeTx);
155 private Object nextIdentifier() {
156 return name + "-" + txCounter.getAndIncrement();
159 private class DOMStoreTransactionChainImpl implements DOMStoreTransactionChain, TransactionReadyPrototype {
162 private SnapshotBackedWriteTransaction latestOutstandingTx;
164 private boolean chainFailed = false;
166 private void checkFailed() {
167 Preconditions.checkState(!chainFailed, "Transaction chain is failed.");
171 public synchronized DOMStoreReadTransaction newReadOnlyTransaction() {
172 final DataTreeSnapshot snapshot;
174 if (latestOutstandingTx != null) {
175 checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
176 snapshot = latestOutstandingTx.getMutatedView();
178 snapshot = dataTree.takeSnapshot();
180 return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot);
184 public synchronized DOMStoreReadWriteTransaction newReadWriteTransaction() {
185 final DataTreeSnapshot snapshot;
187 if (latestOutstandingTx != null) {
188 checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
189 snapshot = latestOutstandingTx.getMutatedView();
191 snapshot = dataTree.takeSnapshot();
193 final SnapshotBackedReadWriteTransaction ret = new SnapshotBackedReadWriteTransaction(nextIdentifier(),
195 latestOutstandingTx = ret;
200 public synchronized DOMStoreWriteTransaction newWriteOnlyTransaction() {
201 final DataTreeSnapshot snapshot;
203 if (latestOutstandingTx != null) {
204 checkState(latestOutstandingTx.isReady(), "Previous transaction in chain must be ready.");
205 snapshot = latestOutstandingTx.getMutatedView();
207 snapshot = dataTree.takeSnapshot();
209 final SnapshotBackedWriteTransaction ret = new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot,
211 latestOutstandingTx = ret;
216 public DOMStoreThreePhaseCommitCohort ready(final SnapshotBackedWriteTransaction tx) {
217 DOMStoreThreePhaseCommitCohort storeCohort = InMemoryDOMDataStore.this.ready(tx);
218 return new ChainedTransactionCommitImpl(tx, storeCohort, this);
222 public void close() {
224 executor.shutdownNow();
228 protected synchronized void onTransactionFailed(final SnapshotBackedWriteTransaction transaction,
234 public synchronized void onTransactionCommited(final SnapshotBackedWriteTransaction transaction) {
235 // If commited transaction is latestOutstandingTx we clear
236 // latestOutstandingTx
237 // field in order to base new transactions on Datastore Data Tree
239 if (transaction.equals(latestOutstandingTx)) {
240 latestOutstandingTx = null;
246 private static class ChainedTransactionCommitImpl implements DOMStoreThreePhaseCommitCohort {
248 private final SnapshotBackedWriteTransaction transaction;
249 private final DOMStoreThreePhaseCommitCohort delegate;
251 private final DOMStoreTransactionChainImpl txChain;
253 protected ChainedTransactionCommitImpl(final SnapshotBackedWriteTransaction transaction,
254 final DOMStoreThreePhaseCommitCohort delegate, final DOMStoreTransactionChainImpl txChain) {
256 this.transaction = transaction;
257 this.delegate = delegate;
258 this.txChain = txChain;
262 public ListenableFuture<Boolean> canCommit() {
263 return delegate.canCommit();
267 public ListenableFuture<Void> preCommit() {
268 return delegate.preCommit();
272 public ListenableFuture<Void> abort() {
273 return delegate.abort();
277 public ListenableFuture<Void> commit() {
278 ListenableFuture<Void> commitFuture = delegate.commit();
279 Futures.addCallback(commitFuture, new FutureCallback<Void>() {
281 public void onFailure(final Throwable t) {
282 txChain.onTransactionFailed(transaction, t);
286 public void onSuccess(final Void result) {
287 txChain.onTransactionCommited(transaction);
296 private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
298 private final SnapshotBackedWriteTransaction transaction;
299 private final DataTreeModification modification;
301 private ResolveDataChangeEventsTask listenerResolver;
302 private DataTreeCandidate candidate;
304 public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) {
305 this.transaction = writeTransaction;
306 this.modification = transaction.getMutatedView();
310 public ListenableFuture<Boolean> canCommit() {
311 return executor.submit(new Callable<Boolean>() {
313 public Boolean call() throws TransactionCommitFailedException {
315 dataTree.validate(modification);
316 LOG.debug("Store Transaction: {} can be committed", transaction.getIdentifier());
318 } catch (ConflictingModificationAppliedException e) {
319 LOG.warn("Store Tx: {} Conflicting modification for {}.", transaction.getIdentifier(),
321 throw new OptimisticLockFailedException("Optimistic lock failed.",e);
322 } catch (DataValidationFailedException e) {
323 LOG.warn("Store Tx: {} Data Precondition failed for {}.", transaction.getIdentifier(),
325 throw new TransactionCommitFailedException("Data did not pass validation.",e);
332 public ListenableFuture<Void> preCommit() {
333 return executor.submit(new Callable<Void>() {
336 candidate = dataTree.prepare(modification);
337 listenerResolver = ResolveDataChangeEventsTask.create(candidate, listenerTree);
344 public ListenableFuture<Void> abort() {
346 return Futures.immediateFuture(null);
350 public ListenableFuture<Void> commit() {
351 checkState(candidate != null, "Proposed subtree must be computed");
354 * The commit has to occur atomically with regard to listener
357 synchronized (this) {
358 dataTree.commit(candidate);
360 for (ChangeListenerNotifyTask task : listenerResolver.call()) {
361 LOG.trace("Scheduling invocation of listeners: {}", task);
362 executor.submit(task);
366 return Futures.immediateFuture(null);