Promote ForeignShardThreePhaseCommitCohort to dom.spi
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InmemoryDOMDataTreeShardWriteTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.store.inmemory;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import java.util.ArrayList;
17 import java.util.Iterator;
18 import java.util.Map.Entry;
19 import java.util.Optional;
20 import java.util.concurrent.atomic.AtomicLong;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
23 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
24 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
25 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardThreePhaseCommitCohort;
26 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
27 import org.opendaylight.yangtools.concepts.Identifiable;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
29 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
30 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
31 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
32 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable<String> {
37
38     private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
39
40     private enum SimpleCursorOperation {
41         MERGE {
42             @Override
43             void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child,
44                              final NormalizedNode<?, ?> data) {
45                 cur.merge(child, data);
46             }
47         },
48         DELETE {
49             @Override
50             void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child,
51                              final NormalizedNode<?, ?> data) {
52                 cur.delete(child);
53             }
54         },
55         WRITE {
56             @Override
57             void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child,
58                              final NormalizedNode<?, ?> data) {
59                 cur.write(child, data);
60             }
61         };
62
63         abstract void applyOnLeaf(DOMDataTreeWriteCursor cur, PathArgument child, NormalizedNode<?, ?> data);
64
65         void apply(final DOMDataTreeWriteCursor cur, final YangInstanceIdentifier path,
66                    final NormalizedNode<?, ?> data) {
67             int enterCount = 0;
68             final Iterator<PathArgument> it = path.getPathArguments().iterator();
69             if (it.hasNext()) {
70                 while (true) {
71                     final PathArgument currentArg = it.next();
72                     if (!it.hasNext()) {
73                         applyOnLeaf(cur, currentArg, data);
74                         break;
75                     }
76
77                     // We need to enter one level deeper, we are not at leaf (modified) node
78                     cur.enter(currentArg);
79                     enterCount++;
80                 }
81             }
82
83             cur.exit(enterCount);
84         }
85     }
86
87     private static final AtomicLong COUNTER = new AtomicLong();
88
89     private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
90     private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
91     private final InMemoryDOMDataTreeShardProducer producer;
92     private final ShardDataModification modification;
93     private final ListeningExecutorService executor;
94     private final DataTree rootShardDataTree;
95     private final String identifier;
96
97     private DataTreeModification rootModification = null;
98     private DOMDataTreeWriteCursor cursor;
99     private boolean finished = false;
100
101     InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer,
102                                              final ShardDataModification root,
103                                              final DataTree rootShardDataTree,
104                                              final InMemoryDOMDataTreeShardChangePublisher changePublisher,
105                                              final ListeningExecutorService executor) {
106         this.producer = producer;
107         this.modification = requireNonNull(root);
108         this.rootShardDataTree = requireNonNull(rootShardDataTree);
109         this.changePublisher = requireNonNull(changePublisher);
110         this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement();
111         LOG.debug("Shard transaction{} created", identifier);
112         this.executor = executor;
113     }
114
115     @Override
116     public String getIdentifier() {
117         return identifier;
118     }
119
120     private DOMDataTreeWriteCursor getCursor() {
121         if (cursor == null) {
122             cursor = new InMemoryShardDataModificationCursor(modification, this);
123         }
124         return cursor;
125     }
126
127     void delete(final YangInstanceIdentifier path) {
128         final YangInstanceIdentifier relativePath = toRelative(path);
129         checkArgument(!relativePath.isEmpty(), "Deletion of shard root is not allowed");
130         SimpleCursorOperation.DELETE.apply(getCursor(), relativePath, null);
131     }
132
133     void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
134         SimpleCursorOperation.MERGE.apply(getCursor(), toRelative(path), data);
135     }
136
137     void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
138         SimpleCursorOperation.WRITE.apply(getCursor(), toRelative(path), data);
139     }
140
141     private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
142         final Optional<YangInstanceIdentifier> relative =
143                 path.relativeTo(modification.getPrefix().getRootIdentifier());
144         checkArgument(relative.isPresent());
145         return relative.get();
146     }
147
148     @Override
149     public void close() {
150         checkState(!finished, "Attempting to close an already finished transaction.");
151         modification.closeTransactions();
152         if (cursor != null) {
153             cursor.close();
154         }
155         producer.transactionAborted(this);
156         finished = true;
157     }
158
159     void cursorClosed() {
160         requireNonNull(cursor);
161         modification.closeCursor();
162         cursor = null;
163     }
164
165     public boolean isFinished() {
166         return finished;
167     }
168
169     @Override
170     public void ready() {
171         checkState(!finished, "Attempting to ready an already finished transaction.");
172         checkState(cursor == null, "Attempting to ready a transaction that has an open cursor.");
173         requireNonNull(modification, "Attempting to ready an empty transaction.");
174
175         LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
176         rootModification = modification.seal();
177
178         producer.transactionReady(this, rootModification);
179         cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
180                 rootShardDataTree, rootModification, changePublisher));
181         for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
182                 modification.getChildShards().entrySet()) {
183             cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
184         }
185         finished = true;
186     }
187
188     @Override
189     public ListenableFuture<Void> submit() {
190         LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
191
192         requireNonNull(cohorts);
193         checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
194
195         return executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts, this));
196     }
197
198     @Override
199     public ListenableFuture<Boolean> validate() {
200         LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
201         return executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
202     }
203
204     @Override
205     public ListenableFuture<Void> prepare() {
206         LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
207         return executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
208     }
209
210     @Override
211     public ListenableFuture<Void> commit() {
212         LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
213         return executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts, this));
214     }
215
216     DataTreeModification getRootModification() {
217         requireNonNull(rootModification, "Transaction wasn't sealed yet");
218         return rootModification;
219     }
220
221     void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) {
222         producer.onTransactionCommited(tx);
223     }
224
225     @Override
226     public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
227         checkState(!finished, "Transaction is finished/closed already.");
228         checkState(cursor == null, "Previous cursor wasn't closed");
229         final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
230         final DOMDataTreeWriteCursor ret = getCursor();
231         ret.enter(relativePath.getPathArguments());
232         return ret;
233     }
234 }