2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.mdsal.dom.store.inmemory;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import com.google.common.util.concurrent.MoreExecutors;
17 import java.util.ArrayList;
18 import java.util.Iterator;
19 import java.util.Map.Entry;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.Executors;
22 import org.opendaylight.mdsal.common.api.ReadFailedException;
23 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
24 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
25 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
28 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
29 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
30 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
34 class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction {
36 private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
38 private enum SimpleCursorOperation {
41 void applyOnLeaf(final DOMDataTreeWriteCursor cursor, final PathArgument child,
42 final NormalizedNode<?, ?> data) {
43 cursor.merge(child, data);
48 void applyOnLeaf(final DOMDataTreeWriteCursor cursor, final PathArgument child,
49 final NormalizedNode<?, ?> data) {
55 void applyOnLeaf(final DOMDataTreeWriteCursor cursor, final PathArgument child,
56 final NormalizedNode<?, ?> data) {
57 cursor.write(child, data);
61 abstract void applyOnLeaf(DOMDataTreeWriteCursor cursor, PathArgument child, NormalizedNode<?, ?> data);
63 void apply(final DOMDataTreeWriteCursor cursor, final YangInstanceIdentifier path,
64 final NormalizedNode<?, ?> data) {
66 Iterator<PathArgument> it = path.getPathArguments().iterator();
67 while (it.hasNext()) {
68 PathArgument currentArg = it.next();
70 // We need to enter one level deeper, we are not at leaf (modified) node
71 cursor.enter(currentArg);
74 applyOnLeaf(cursor, currentArg, data);
77 cursor.exit(enterCount);
81 private final ShardDataModification modification;
82 private DOMDataTreeWriteCursor cursor;
83 private DataTree rootShardDataTree;
84 private DataTreeModification rootModification = null;
86 private ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
87 private InMemoryDOMDataTreeShardChangePublisher changePublisher;
89 // FIXME inject into shard?
90 private ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
92 InmemoryDOMDataTreeShardWriteTransaction(final ShardDataModification root,
93 final DataTree rootShardDataTree,
94 final InMemoryDOMDataTreeShardChangePublisher changePublisher) {
95 this.modification = Preconditions.checkNotNull(root);
96 this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
97 this.changePublisher = Preconditions.checkNotNull(changePublisher);
100 private DOMDataTreeWriteCursor getCursor() {
101 if (cursor == null) {
102 cursor = new ShardDataModificationCursor(modification);
107 void delete(final YangInstanceIdentifier path) {
108 YangInstanceIdentifier relativePath = toRelative(path);
109 Preconditions.checkArgument(!YangInstanceIdentifier.EMPTY.equals(relativePath),
110 "Deletion of shard root is not allowed");
111 SimpleCursorOperation.DELETE.apply(getCursor(), relativePath , null);
114 void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
115 SimpleCursorOperation.MERGE.apply(getCursor(), toRelative(path), data);
118 void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
119 SimpleCursorOperation.DELETE.apply(getCursor(), toRelative(path), data);
122 private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
123 Optional<YangInstanceIdentifier> relative =
124 path.relativeTo(modification.getPrefix().getRootIdentifier());
125 Preconditions.checkArgument(relative.isPresent());
126 return relative.get();
129 public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
130 // FIXME: Implement this
134 public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
135 // TODO Auto-generated method stub
140 public Object getIdentifier() {
141 // TODO Auto-generated method stub
145 public void close() {
146 // TODO Auto-generated method stub
150 public void ready() {
152 LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
153 rootModification = modification.seal();
155 cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(rootShardDataTree, rootModification, changePublisher));
156 for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry : modification.getChildShards().entrySet()) {
157 cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
162 public ListenableFuture<Void> submit() {
163 LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
165 Preconditions.checkNotNull(cohorts);
166 Preconditions.checkState(!cohorts.isEmpty(), "Submitting an empty transaction");
168 final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts));
174 public ListenableFuture<Boolean> validate() {
175 LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
177 final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
182 public ListenableFuture<Void> prepare() {
183 LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
185 final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
190 public ListenableFuture<Void> commit() {
191 LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
193 final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts));
197 public void followUp() {
202 public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
203 DOMDataTreeWriteCursor ret = getCursor();
204 YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
205 ret.enter(relativePath.getPathArguments());