2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.mdsal.dom.store.inmemory;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import java.util.ArrayList;
17 import java.util.Iterator;
18 import java.util.Map.Entry;
19 import java.util.Optional;
20 import java.util.concurrent.atomic.AtomicLong;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
23 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
24 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
25 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardThreePhaseCommitCohort;
26 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
27 import org.opendaylight.yangtools.concepts.Identifiable;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
30 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
31 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
32 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable<String> {
38 private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
40 private enum SimpleCursorOperation {
43 void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child,
44 final NormalizedNode<?, ?> data) {
45 cur.merge(child, data);
50 void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child,
51 final NormalizedNode<?, ?> data) {
57 void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child,
58 final NormalizedNode<?, ?> data) {
59 cur.write(child, data);
63 abstract void applyOnLeaf(DOMDataTreeWriteCursor cur, PathArgument child, NormalizedNode<?, ?> data);
65 void apply(final DOMDataTreeWriteCursor cur, final YangInstanceIdentifier path,
66 final NormalizedNode<?, ?> data) {
68 final Iterator<PathArgument> it = path.getPathArguments().iterator();
71 final PathArgument currentArg = it.next();
73 applyOnLeaf(cur, currentArg, data);
77 // We need to enter one level deeper, we are not at leaf (modified) node
78 cur.enter(currentArg);
87 private static final AtomicLong COUNTER = new AtomicLong();
89 private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
90 private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
91 private final InMemoryDOMDataTreeShardProducer producer;
92 private final ShardDataModification modification;
93 private final ListeningExecutorService executor;
94 private final DataTree rootShardDataTree;
95 private final String identifier;
97 private DataTreeModification rootModification = null;
98 private DOMDataTreeWriteCursor cursor;
99 private boolean finished = false;
101 InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer,
102 final ShardDataModification root,
103 final DataTree rootShardDataTree,
104 final InMemoryDOMDataTreeShardChangePublisher changePublisher,
105 final ListeningExecutorService executor) {
106 this.producer = producer;
107 this.modification = requireNonNull(root);
108 this.rootShardDataTree = requireNonNull(rootShardDataTree);
109 this.changePublisher = requireNonNull(changePublisher);
110 this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement();
111 LOG.debug("Shard transaction{} created", identifier);
112 this.executor = executor;
116 public String getIdentifier() {
120 private DOMDataTreeWriteCursor getCursor() {
121 if (cursor == null) {
122 cursor = new InMemoryShardDataModificationCursor(modification, this);
127 void delete(final YangInstanceIdentifier path) {
128 final YangInstanceIdentifier relativePath = toRelative(path);
129 checkArgument(!relativePath.isEmpty(), "Deletion of shard root is not allowed");
130 SimpleCursorOperation.DELETE.apply(getCursor(), relativePath, null);
133 void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
134 SimpleCursorOperation.MERGE.apply(getCursor(), toRelative(path), data);
137 void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
138 SimpleCursorOperation.WRITE.apply(getCursor(), toRelative(path), data);
141 private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
142 final Optional<YangInstanceIdentifier> relative =
143 path.relativeTo(modification.getPrefix().getRootIdentifier());
144 checkArgument(relative.isPresent());
145 return relative.get();
149 public void close() {
150 checkState(!finished, "Attempting to close an already finished transaction.");
151 modification.closeTransactions();
152 if (cursor != null) {
155 producer.transactionAborted(this);
159 void cursorClosed() {
160 requireNonNull(cursor);
161 modification.closeCursor();
165 public boolean isFinished() {
170 public void ready() {
171 checkState(!finished, "Attempting to ready an already finished transaction.");
172 checkState(cursor == null, "Attempting to ready a transaction that has an open cursor.");
173 requireNonNull(modification, "Attempting to ready an empty transaction.");
175 LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
176 rootModification = modification.seal();
178 producer.transactionReady(this, rootModification);
179 cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
180 rootShardDataTree, rootModification, changePublisher));
181 for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
182 modification.getChildShards().entrySet()) {
183 cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
189 public ListenableFuture<Void> submit() {
190 LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
192 requireNonNull(cohorts);
193 checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
195 return executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts, this));
199 public ListenableFuture<Boolean> validate() {
200 LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
201 return executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
205 public ListenableFuture<Void> prepare() {
206 LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
207 return executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
211 public ListenableFuture<Void> commit() {
212 LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
213 return executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts, this));
216 DataTreeModification getRootModification() {
217 requireNonNull(rootModification, "Transaction wasn't sealed yet");
218 return rootModification;
221 void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) {
222 producer.onTransactionCommited(tx);
226 public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
227 checkState(!finished, "Transaction is finished/closed already.");
228 checkState(cursor == null, "Previous cursor wasn't closed");
229 final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
230 final DOMDataTreeWriteCursor ret = getCursor();
231 ret.enter(relativePath.getPathArguments());