078299b3f3a6e677f8e93783617f490e9ba3f352
[mdsal.git] / dom / mdsal-dom-inmemory-datastore / src / main / java / org / opendaylight / mdsal / dom / store / inmemory / InmemoryDOMDataTreeShardWriteTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.store.inmemory;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.util.concurrent.ListenableFuture;
15 import com.google.common.util.concurrent.ListeningExecutorService;
16 import java.util.ArrayList;
17 import java.util.Iterator;
18 import java.util.Map.Entry;
19 import java.util.Optional;
20 import java.util.concurrent.atomic.AtomicLong;
21 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
22 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
23 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
24 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
25 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
26 import org.opendaylight.yangtools.concepts.Identifiable;
27 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
28 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
29 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
30 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
31 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 class InmemoryDOMDataTreeShardWriteTransaction implements DOMDataTreeShardWriteTransaction, Identifiable<String> {
36
37     private static final Logger LOG = LoggerFactory.getLogger(InmemoryDOMDataTreeShardWriteTransaction.class);
38
39     private enum SimpleCursorOperation {
40         MERGE {
41             @Override
42             void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child,
43                              final NormalizedNode<?, ?> data) {
44                 cur.merge(child, data);
45             }
46         },
47         DELETE {
48             @Override
49             void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child,
50                              final NormalizedNode<?, ?> data) {
51                 cur.delete(child);
52             }
53         },
54         WRITE {
55             @Override
56             void applyOnLeaf(final DOMDataTreeWriteCursor cur, final PathArgument child,
57                              final NormalizedNode<?, ?> data) {
58                 cur.write(child, data);
59             }
60         };
61
62         abstract void applyOnLeaf(DOMDataTreeWriteCursor cur, PathArgument child, NormalizedNode<?, ?> data);
63
64         void apply(final DOMDataTreeWriteCursor cur, final YangInstanceIdentifier path,
65                    final NormalizedNode<?, ?> data) {
66             int enterCount = 0;
67             final Iterator<PathArgument> it = path.getPathArguments().iterator();
68             if (it.hasNext()) {
69                 while (true) {
70                     final PathArgument currentArg = it.next();
71                     if (!it.hasNext()) {
72                         applyOnLeaf(cur, currentArg, data);
73                         break;
74                     }
75
76                     // We need to enter one level deeper, we are not at leaf (modified) node
77                     cur.enter(currentArg);
78                     enterCount++;
79                 }
80             }
81
82             cur.exit(enterCount);
83         }
84     }
85
86     private static final AtomicLong COUNTER = new AtomicLong();
87
88     private final ArrayList<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
89     private final InMemoryDOMDataTreeShardChangePublisher changePublisher;
90     private final InMemoryDOMDataTreeShardProducer producer;
91     private final ShardDataModification modification;
92     private final ListeningExecutorService executor;
93     private final DataTree rootShardDataTree;
94     private final String identifier;
95
96     private DataTreeModification rootModification = null;
97     private DOMDataTreeWriteCursor cursor;
98     private boolean finished = false;
99
100     InmemoryDOMDataTreeShardWriteTransaction(final InMemoryDOMDataTreeShardProducer producer,
101                                              final ShardDataModification root,
102                                              final DataTree rootShardDataTree,
103                                              final InMemoryDOMDataTreeShardChangePublisher changePublisher,
104                                              final ListeningExecutorService executor) {
105         this.producer = producer;
106         this.modification = requireNonNull(root);
107         this.rootShardDataTree = requireNonNull(rootShardDataTree);
108         this.changePublisher = requireNonNull(changePublisher);
109         this.identifier = "INMEMORY-SHARD-TX-" + COUNTER.getAndIncrement();
110         LOG.debug("Shard transaction{} created", identifier);
111         this.executor = executor;
112     }
113
114     @Override
115     public String getIdentifier() {
116         return identifier;
117     }
118
119     private DOMDataTreeWriteCursor getCursor() {
120         if (cursor == null) {
121             cursor = new InMemoryShardDataModificationCursor(modification, this);
122         }
123         return cursor;
124     }
125
126     void delete(final YangInstanceIdentifier path) {
127         final YangInstanceIdentifier relativePath = toRelative(path);
128         checkArgument(!relativePath.isEmpty(), "Deletion of shard root is not allowed");
129         SimpleCursorOperation.DELETE.apply(getCursor(), relativePath, null);
130     }
131
132     void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
133         SimpleCursorOperation.MERGE.apply(getCursor(), toRelative(path), data);
134     }
135
136     void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
137         SimpleCursorOperation.WRITE.apply(getCursor(), toRelative(path), data);
138     }
139
140     private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
141         final Optional<YangInstanceIdentifier> relative =
142                 path.relativeTo(modification.getPrefix().getRootIdentifier());
143         checkArgument(relative.isPresent());
144         return relative.get();
145     }
146
147     @Override
148     public void close() {
149         checkState(!finished, "Attempting to close an already finished transaction.");
150         modification.closeTransactions();
151         if (cursor != null) {
152             cursor.close();
153         }
154         producer.transactionAborted(this);
155         finished = true;
156     }
157
158     void cursorClosed() {
159         requireNonNull(cursor);
160         modification.closeCursor();
161         cursor = null;
162     }
163
164     public boolean isFinished() {
165         return finished;
166     }
167
168     @Override
169     public void ready() {
170         checkState(!finished, "Attempting to ready an already finished transaction.");
171         checkState(cursor == null, "Attempting to ready a transaction that has an open cursor.");
172         requireNonNull(modification, "Attempting to ready an empty transaction.");
173
174         LOG.debug("Readying open transaction on shard {}", modification.getPrefix());
175         rootModification = modification.seal();
176
177         producer.transactionReady(this, rootModification);
178         cohorts.add(new InMemoryDOMDataTreeShardThreePhaseCommitCohort(
179                 rootShardDataTree, rootModification, changePublisher));
180         for (final Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry :
181                 modification.getChildShards().entrySet()) {
182             cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
183         }
184         finished = true;
185     }
186
187     @Override
188     public ListenableFuture<Void> submit() {
189         LOG.debug("Submitting open transaction on shard {}", modification.getPrefix());
190
191         requireNonNull(cohorts);
192         checkState(!cohorts.isEmpty(), "Transaction was not readied yet.");
193
194         return executor.submit(new ShardSubmitCoordinationTask(modification.getPrefix(), cohorts, this));
195     }
196
197     @Override
198     public ListenableFuture<Boolean> validate() {
199         LOG.debug("CanCommit on open transaction on shard {}", modification.getPrefix());
200         return executor.submit(new ShardCanCommitCoordinationTask(modification.getPrefix(), cohorts));
201     }
202
203     @Override
204     public ListenableFuture<Void> prepare() {
205         LOG.debug("PreCommit on open transaction on shard {}", modification.getPrefix());
206         return executor.submit(new ShardPreCommitCoordinationTask(modification.getPrefix(), cohorts));
207     }
208
209     @Override
210     public ListenableFuture<Void> commit() {
211         LOG.debug("Commit open transaction on shard {}", modification.getPrefix());
212         return executor.submit(new ShardCommitCoordinationTask(modification.getPrefix(), cohorts, this));
213     }
214
215     DataTreeModification getRootModification() {
216         requireNonNull(rootModification, "Transaction wasn't sealed yet");
217         return rootModification;
218     }
219
220     void transactionCommited(final InmemoryDOMDataTreeShardWriteTransaction tx) {
221         producer.onTransactionCommited(tx);
222     }
223
224     @Override
225     public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
226         checkState(!finished, "Transaction is finished/closed already.");
227         checkState(cursor == null, "Previous cursor wasn't closed");
228         final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
229         final DOMDataTreeWriteCursor ret = getCursor();
230         ret.enter(relativePath.getPathArguments());
231         return ret;
232     }
233 }