Fix InMemory shard transaction chaining.
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InmemoryDOMDataTreeShardWriteTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.mdsal.dom.store.inmemory;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.CheckedFuture;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import java.util.ArrayList;
17 import java.util.Iterator;
18 import java.util.Map.Entry;
19 import java.util.concurrent.atomic.AtomicLong;
20 import org.opendaylight.mdsal.common.api.ReadFailedException;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
23 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
24 import org.opendaylight.yangtools.concepts.Identifiable;
25 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
26 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
27 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
28 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
29 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable<String> {
34
35     private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
36
37     private enum SimpleCursorOperation {
38         MERGE {
39             @Override
40             void applyOnLeaf(final DOMDataTreeWriteCursor cursor, final PathArgument child,
41                     final NormalizedNode<?, ?> data) {
42                 cursor.merge(child, data);
43             }
44         },
45         DELETE {
46             @Override
47             void applyOnLeaf(final DOMDataTreeWriteCursor cursor, final PathArgument child,
48                     final NormalizedNode<?, ?> data) {
49                 cursor.delete(child);
50             }
51         },
52         WRITE {
53             @Override
54             void applyOnLeaf(final DOMDataTreeWriteCursor cursor, final PathArgument child,
55                     final NormalizedNode<?, ?> data) {
56                 cursor.write(child, data);
57             }
58         };
59
60         abstract void applyOnLeaf(DOMDataTreeWriteCursor cursor, PathArgument child, NormalizedNode<?, ?> data);
61
62         void apply(final DOMDataTreeWriteCursor cursor, final YangInstanceIdentifier path,
63                 final NormalizedNode<?, ?> data) {
64             int enterCount = 0;
65             final Iterator<PathArgument> it = path.getPathArguments().iterator();
66             while (it.hasNext()) {
67                 final PathArgument currentArg = it.next();
68                 if (it.hasNext()) {
69                     // We need to enter one level deeper, we are not at leaf (modified) node
70                     cursor.enter(currentArg);
71                     enterCount++;
72                 } else {
73                     applyOnLeaf(cursor, currentArg, data);
74                 }
75             }
76             cursor.exit(enterCount);
77         }
78     }
79
80     private static final AtomicLong COUNTER = new AtomicLong();
81
82     private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
83     private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
84     private final InMemoryDOMDataTreeShardProducer producer;
85     private final ShardDataModification modification;
86     private final ListeningExecutorService executor;
87     private final DataTree rootShardDataTree;
88     private final String identifier;
89
90     private DataTreeModification rootModification = null;
91     private DOMDataTreeWriteCursor cursor;
92     private boolean finished = false;
93
94     InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer,
95                                              final ShardDataModification root,
96                                              final DataTree rootShardDataTree,
97                                              final InMemoryDOMDataTreeShardChangePublisher changePublisher,
98                                              final ListeningExecutorService executor) {
99         this.producer = producer;
100         this.modification = Preconditions.checkNotNull(root);
101         this.rootShardDataTree = Preconditions.checkNotNull(rootShardDataTree);
102         this.changePublisher = Preconditions.checkNotNull(changePublisher);
103         this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement();
104         LOG.debug("Shard transaction{} created", identifier);
105         this.executor = executor;
106     }
107
108     @Override
109     public String getIdentifier() {
110         return identifier;
111     }
112
113     private DOMDataTreeWriteCursor getCursor() {
114         if (cursor == null) {
115             cursor = new ShardDataModificationCursor(modification, this);
116         }
117         return cursor;
118     }
119
120     void delete(final YangInstanceIdentifier path) {
121         final YangInstanceIdentifier relativePath = toRelative(path);
122         Preconditions.checkArgument(!YangInstanceIdentifier.EMPTY.equals(relativePath),
123                 "Deletion of shard root is not allowed");
124         SimpleCursorOperation.DELETE.apply(getCursor(), relativePath , null);
125     }
126
127     void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
128         SimpleCursorOperation.MERGE.apply(getCursor(), toRelative(path), data);
129     }
130
131     void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
132         SimpleCursorOperation.DELETE.apply(getCursor(), toRelative(path), data);
133     }
134
135     private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
136         final Optional<YangInstanceIdentifier> relative =
137                 path.relativeTo(modification.getPrefix().getRootIdentifier());
138         Preconditions.checkArgument(relative.isPresent());
139         return relative.get();
140     }
141
142     public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
143         throw new UnsupportedOperationException("Not implemented yet");
144     }
145
146     public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
147         throw new UnsupportedOperationException("Not implemented yet");
148     }
149
150     @Override
151     public void close() {
152         Preconditions.checkState(!finished, "Attempting to close an already finished transaction.");
153         modification.closeTransactions();
154         if (cursor != null) {
155             cursor.close();
156         }
157         producer.transactionAborted(this);
158         finished = true;
159     }
160
161     void cursorClosed() {
162         Preconditions.checkNotNull(cursor);
163         modification.closeCursor();
164         cursor = null;
165     }
166
167     public boolean isFinished() {
168         return finished;
169     }
170
171     @Override
172     public void ready() {
173         Preconditions.checkState(!finished, "Attempting to ready an already finished transaction.");
174         Preconditions.checkState(cursor == null, "Attempting to ready a transaction that has an open cursor.");
175         Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction.");
176
177         LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
178         rootModification = modification.seal();
179
180         producer.transactionReady(this, rootModification);
181         cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
182                 rootShardDataTree, rootModification, changePublisher));
183         for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
184                 modification.getChildShards().entrySet()) {
185             cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
186         }
187         finished = true;
188     }
189
190     @Override
191     public ListenableFuture<Void> submit() {
192         LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
193
194         Preconditions.checkNotNull(cohorts);
195         Preconditions.checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
196
197         final ListenableFuture<Void> submit = executor.submit(new ShardSubmitCoordinationTask(
198                 modification.getPrefix(), cohorts, this));
199
200         return submit;
201     }
202
203     @Override
204     public ListenableFuture<Boolean> validate() {
205         LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
206
207         final ListenableFuture<Boolean> submit = executor.submit(new ShardCanCommitCoordinationTask(
208                 modification.getPrefix(), cohorts));
209         return submit;
210     }
211
212     @Override
213     public ListenableFuture<Void> prepare() {
214         LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
215
216         final ListenableFuture<Void> submit = executor.submit(new ShardPreCommitCoordinationTask(
217                 modification.getPrefix(), cohorts));
218         return submit;
219     }
220
221     @Override
222     public ListenableFuture<Void> commit() {
223         LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
224
225         final ListenableFuture<Void> submit = executor.submit(new ShardCommitCoordinationTask(
226                 modification.getPrefix(), cohorts, this));
227         return submit;
228     }
229
230     DataTreeModification getRootModification() {
231         Preconditions.checkNotNull(rootModification, "Transaction wasn't sealed yet");
232         return rootModification;
233     }
234
235     void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) {
236         producer.onTransactionCommited(tx);
237     }
238
239     @Override
240     public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
241         Preconditions.checkState(!finished, "Transaction is finished/closed already.");
242         Preconditions.checkState(cursor == null, "Previous cursor wasn't closed");
243         final DOMDataTreeWriteCursor ret = getCursor();
244         final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
245         ret.enter(relativePath.getPathArguments());
246         return ret;
247     }
248 }