2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.md.sal.dom.store.impl;
10 import static com.google.common.base.Preconditions.checkNotNull;
11 import static com.google.common.base.Preconditions.checkState;
12 import static org.opendaylight.controller.md.sal.dom.store.impl.StoreUtils.increase;
14 import java.util.Collections;
15 import java.util.concurrent.Callable;
16 import java.util.concurrent.atomic.AtomicLong;
17 import java.util.concurrent.atomic.AtomicReference;
19 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
20 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
21 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree;
22 import org.opendaylight.controller.md.sal.dom.store.impl.tree.ModificationType;
23 import org.opendaylight.controller.md.sal.dom.store.impl.tree.NodeModification;
24 import org.opendaylight.controller.md.sal.dom.store.impl.tree.StoreMetadataNode;
25 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
26 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
27 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
28 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
29 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
30 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
31 import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
32 import org.opendaylight.yangtools.concepts.Identifiable;
33 import org.opendaylight.yangtools.concepts.ListenerRegistration;
34 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
35 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier.PathArgument;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.opendaylight.yangtools.yang.data.impl.schema.NormalizedNodeUtils;
38 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
39 import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
43 import com.google.common.base.Objects;
44 import com.google.common.base.Objects.ToStringHelper;
45 import com.google.common.base.Optional;
46 import com.google.common.base.Preconditions;
47 import com.google.common.primitives.UnsignedLong;
48 import com.google.common.util.concurrent.Futures;
49 import com.google.common.util.concurrent.ListenableFuture;
50 import com.google.common.util.concurrent.ListeningExecutorService;
52 public class InMemoryDOMDataStore implements DOMStore, Identifiable<String>, SchemaContextListener {
54 private static final Logger LOG = LoggerFactory.getLogger(InMemoryDOMDataStore.class);
55 private static final InstanceIdentifier PUBLIC_ROOT_PATH = InstanceIdentifier.builder().build();
57 private final ListeningExecutorService executor;
58 private final String name;
59 private final AtomicLong txCounter = new AtomicLong(0);
60 private final ListenerTree listenerTree;
61 private final AtomicReference<DataAndMetadataSnapshot> snapshot;
63 private ModificationApplyOperation operationTree;
65 private SchemaContext schemaContext;
67 public InMemoryDOMDataStore(final String name, final ListeningExecutorService executor) {
68 this.name = Preconditions.checkNotNull(name);
69 this.executor = Preconditions.checkNotNull(executor);
70 this.listenerTree = ListenerTree.create();
71 this.snapshot = new AtomicReference<DataAndMetadataSnapshot>(DataAndMetadataSnapshot.createEmpty());
72 this.operationTree = new AlwaysFailOperation();
76 public final String getIdentifier() {
81 public DOMStoreReadTransaction newReadOnlyTransaction() {
82 return new SnapshotBackedReadTransaction(nextIdentifier(), snapshot.get());
86 public DOMStoreReadWriteTransaction newReadWriteTransaction() {
87 return new SnapshotBackedReadWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
91 public DOMStoreWriteTransaction newWriteOnlyTransaction() {
92 return new SnapshotBackedWriteTransaction(nextIdentifier(), snapshot.get(), this, operationTree);
96 public synchronized void onGlobalContextUpdated(final SchemaContext ctx) {
97 operationTree = SchemaAwareApplyOperationRoot.from(ctx);
102 public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
103 final InstanceIdentifier path, final L listener, final DataChangeScope scope) {
106 * Make sure commit is not occurring right now. Listener has to be
107 * registered and its state capture enqueued at a consistent point.
109 * FIXME: improve this to read-write lock, such that multiple listener
110 * registrations can occur simultaneously
112 final DataChangeListenerRegistration<L> reg;
113 synchronized (this) {
114 LOG.debug("{}: Registering data change listener {} for {}", name, listener, path);
116 reg = listenerTree.registerDataChangeListener(path, listener, scope);
118 Optional<StoreMetadataNode> currentState = snapshot.get().read(path);
119 if (currentState.isPresent()) {
120 final NormalizedNode<?, ?> data = currentState.get().getData();
122 final DOMImmutableDataChangeEvent event = DOMImmutableDataChangeEvent.builder(DataChangeScope.BASE) //
124 .addCreated(path, data) //
126 executor.submit(new ChangeListenerNotifyTask(Collections.singletonList(reg), event));
130 return new AbstractListenerRegistration<L>(listener) {
132 protected void removeRegistration() {
133 synchronized (InMemoryDOMDataStore.this) {
140 private synchronized DOMStoreThreePhaseCommitCohort submit(final SnapshotBackedWriteTransaction writeTx) {
141 LOG.debug("Tx: {} is submitted. Modifications: {}", writeTx.getIdentifier(), writeTx.getMutatedView());
142 return new ThreePhaseCommitImpl(writeTx);
145 private Object nextIdentifier() {
146 return name + "-" + txCounter.getAndIncrement();
149 private void commit(final DataAndMetadataSnapshot currentSnapshot, final StoreMetadataNode newDataTree,
150 final ResolveDataChangeEventsTask listenerResolver) {
151 LOG.debug("Updating Store snaphot version: {} with version:{}", currentSnapshot.getMetadataTree()
152 .getSubtreeVersion(), newDataTree.getSubtreeVersion());
154 if (LOG.isTraceEnabled()) {
155 LOG.trace("Data Tree is {}", StoreUtils.toStringTree(newDataTree.getData()));
158 final DataAndMetadataSnapshot newSnapshot = DataAndMetadataSnapshot.builder() //
159 .setMetadataTree(newDataTree) //
160 .setSchemaContext(schemaContext) //
164 * The commit has to occur atomically with regard to listener
167 synchronized (this) {
168 final boolean success = snapshot.compareAndSet(currentSnapshot, newSnapshot);
169 checkState(success, "Store snapshot and transaction snapshot differ. This should never happen.");
171 for (ChangeListenerNotifyTask task : listenerResolver.call()) {
172 LOG.trace("Scheduling invocation of listeners: {}", task);
173 executor.submit(task);
178 private static abstract class AbstractDOMStoreTransaction implements DOMStoreTransaction {
179 private final Object identifier;
181 protected AbstractDOMStoreTransaction(final Object identifier) {
182 this.identifier = identifier;
186 public final Object getIdentifier() {
191 public final String toString() {
192 return addToStringAttributes(Objects.toStringHelper(this)).toString();
196 * Add class-specific toString attributes.
198 * @param toStringHelper
199 * ToStringHelper instance
200 * @return ToStringHelper instance which was passed in
202 protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
203 return toStringHelper.add("id", identifier);
207 private static class SnapshotBackedReadTransaction extends AbstractDOMStoreTransaction implements
208 DOMStoreReadTransaction {
209 private DataAndMetadataSnapshot stableSnapshot;
211 public SnapshotBackedReadTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot) {
213 this.stableSnapshot = Preconditions.checkNotNull(snapshot);
214 LOG.debug("ReadOnly Tx: {} allocated with snapshot {}", identifier, snapshot.getMetadataTree()
215 .getSubtreeVersion());
219 public void close() {
220 LOG.debug("Store transaction: {} : Closed", getIdentifier());
221 stableSnapshot = null;
225 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
226 checkNotNull(path, "Path must not be null.");
227 checkState(stableSnapshot != null, "Transaction is closed");
228 return Futures.immediateFuture(NormalizedNodeUtils.findNode(stableSnapshot.getDataTree(), path));
232 private static class SnapshotBackedWriteTransaction extends AbstractDOMStoreTransaction implements
233 DOMStoreWriteTransaction {
234 private MutableDataTree mutableTree;
235 private InMemoryDOMDataStore store;
236 private boolean ready = false;
238 public SnapshotBackedWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
239 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
241 mutableTree = MutableDataTree.from(snapshot, applyOper);
243 LOG.debug("Write Tx: {} allocated with snapshot {}", identifier, snapshot.getMetadataTree()
244 .getSubtreeVersion());
248 public void close() {
249 LOG.debug("Store transaction: {} : Closed", getIdentifier());
250 this.mutableTree = null;
255 public void write(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
258 LOG.trace("Tx: {} Write: {}:{}", getIdentifier(), path, data);
259 mutableTree.write(path, data);
260 // FIXME: Add checked exception
261 } catch (Exception e) {
262 LOG.error("Tx: {}, failed to write {}:{} in {}", getIdentifier(), path, data, mutableTree, e);
267 public void merge(final InstanceIdentifier path, final NormalizedNode<?, ?> data) {
270 LOG.trace("Tx: {} Merge: {}:{}", getIdentifier(), path, data);
271 mutableTree.merge(path, data);
272 // FIXME: Add checked exception
273 } catch (Exception e) {
274 LOG.error("Tx: {}, failed to write {}:{} in {}", getIdentifier(), path, data, mutableTree, e);
279 public void delete(final InstanceIdentifier path) {
282 LOG.trace("Tx: {} Delete: {}", getIdentifier(), path);
283 mutableTree.delete(path);
284 // FIXME: Add checked exception
285 } catch (Exception e) {
286 LOG.error("Tx: {}, failed to delete {} in {}", getIdentifier(), path, mutableTree, e);
290 protected final boolean isReady() {
294 protected final void checkNotReady() {
295 checkState(!ready, "Transaction %s is ready. No further modifications allowed.", getIdentifier());
299 public synchronized DOMStoreThreePhaseCommitCohort ready() {
300 checkState(!ready, "Transaction %s is already ready.", getIdentifier());
303 LOG.debug("Store transaction: {} : Ready", getIdentifier());
305 return store.submit(this);
308 protected MutableDataTree getMutatedView() {
313 protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
314 return toStringHelper.add("ready", isReady());
318 private static class SnapshotBackedReadWriteTransaction extends SnapshotBackedWriteTransaction implements
319 DOMStoreReadWriteTransaction {
321 protected SnapshotBackedReadWriteTransaction(final Object identifier, final DataAndMetadataSnapshot snapshot,
322 final InMemoryDOMDataStore store, final ModificationApplyOperation applyOper) {
323 super(identifier, snapshot, store, applyOper);
327 public ListenableFuture<Optional<NormalizedNode<?, ?>>> read(final InstanceIdentifier path) {
328 LOG.trace("Tx: {} Read: {}", getIdentifier(), path);
330 return Futures.immediateFuture(getMutatedView().read(path));
331 } catch (Exception e) {
332 LOG.error("Tx: {} Failed Read of {}", getIdentifier(), path, e);
338 private class ThreePhaseCommitImpl implements DOMStoreThreePhaseCommitCohort {
340 private final SnapshotBackedWriteTransaction transaction;
341 private final NodeModification modification;
343 private DataAndMetadataSnapshot storeSnapshot;
344 private Optional<StoreMetadataNode> proposedSubtree;
345 private ResolveDataChangeEventsTask listenerResolver;
347 public ThreePhaseCommitImpl(final SnapshotBackedWriteTransaction writeTransaction) {
348 this.transaction = writeTransaction;
349 this.modification = transaction.getMutatedView().getRootModification();
353 public ListenableFuture<Boolean> canCommit() {
354 final DataAndMetadataSnapshot snapshotCapture = snapshot.get();
355 final ModificationApplyOperation snapshotOperation = operationTree;
357 return executor.submit(new Callable<Boolean>() {
360 public Boolean call() throws Exception {
361 Boolean applicable = false;
363 snapshotOperation.checkApplicable(PUBLIC_ROOT_PATH, modification,
364 Optional.of(snapshotCapture.getMetadataTree()));
366 } catch (DataPreconditionFailedException e) {
367 LOG.warn("Store Tx: {} Data Precondition failed for {}.",transaction.getIdentifier(),e.getPath(),e);
370 LOG.debug("Store Transaction: {} : canCommit : {}", transaction.getIdentifier(), applicable);
377 public ListenableFuture<Void> preCommit() {
378 storeSnapshot = snapshot.get();
379 if (modification.getModificationType() == ModificationType.UNMODIFIED) {
380 return Futures.immediateFuture(null);
382 return executor.submit(new Callable<Void>() {
385 public Void call() throws Exception {
386 StoreMetadataNode metadataTree = storeSnapshot.getMetadataTree();
388 proposedSubtree = operationTree.apply(modification, Optional.of(metadataTree),
389 increase(metadataTree.getSubtreeVersion()));
391 listenerResolver = ResolveDataChangeEventsTask.create() //
392 .setRootPath(PUBLIC_ROOT_PATH) //
393 .setBeforeRoot(Optional.of(metadataTree)) //
394 .setAfterRoot(proposedSubtree) //
395 .setModificationRoot(modification) //
396 .setListenerRoot(listenerTree);
404 public ListenableFuture<Void> abort() {
405 storeSnapshot = null;
406 proposedSubtree = null;
407 return Futures.<Void> immediateFuture(null);
411 public ListenableFuture<Void> commit() {
412 if (modification.getModificationType() == ModificationType.UNMODIFIED) {
413 return Futures.immediateFuture(null);
416 checkState(proposedSubtree != null, "Proposed subtree must be computed");
417 checkState(storeSnapshot != null, "Proposed subtree must be computed");
418 // return ImmediateFuture<>;
419 InMemoryDOMDataStore.this.commit(storeSnapshot, proposedSubtree.get(), listenerResolver);
420 return Futures.<Void> immediateFuture(null);
425 private static final class AlwaysFailOperation implements ModificationApplyOperation {
428 public Optional<StoreMetadataNode> apply(final NodeModification modification,
429 final Optional<StoreMetadataNode> storeMeta, final UnsignedLong subtreeVersion) {
430 throw new IllegalStateException("Schema Context is not available.");
434 public void checkApplicable(final InstanceIdentifier path,final NodeModification modification, final Optional<StoreMetadataNode> storeMetadata) {
435 throw new IllegalStateException("Schema Context is not available.");
439 public Optional<ModificationApplyOperation> getChild(final PathArgument child) {
440 throw new IllegalStateException("Schema Context is not available.");
444 public void verifyStructure(final NodeModification modification) throws IllegalArgumentException {
445 throw new IllegalStateException("Schema Context is not available.");