Remove @(Not)ThreadSafe annotation
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / ShardedDOMDataTreeWriteTransaction.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayDeque;
21 import java.util.Deque;
22 import java.util.Map;
23 import java.util.Map.Entry;
24 import java.util.concurrent.atomic.AtomicLong;
25 import java.util.function.BiConsumer;
26 import java.util.function.Consumer;
27 import java.util.stream.Collectors;
28 import javax.annotation.concurrent.GuardedBy;
29 import org.eclipse.jdt.annotation.NonNull;
30 import org.opendaylight.mdsal.common.api.CommitInfo;
31 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
34 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
35 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
36 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAwareTransaction {
41     private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTreeWriteTransaction.class);
42     private static final AtomicLong COUNTER = new AtomicLong();
43
44     private final Map<DOMDataTreeIdentifier, DOMDataTreeShardWriteTransaction> transactions;
45     private final ShardedDOMDataTreeProducer producer;
46     private final ProducerLayout layout;
47     private final String identifier;
48
49     private final SettableFuture<CommitInfo> future = SettableFuture.create();
50
51     @GuardedBy("this")
52     private boolean closed =  false;
53
54     @GuardedBy("this")
55     private DOMDataTreeWriteCursor openCursor;
56
57     ShardedDOMDataTreeWriteTransaction(final ShardedDOMDataTreeProducer producer,
58         final Map<DOMDataTreeIdentifier, DOMDataTreeShardWriteTransaction> transactions, final ProducerLayout layout) {
59         this.producer = requireNonNull(producer);
60         this.transactions = ImmutableMap.copyOf(transactions);
61         this.layout = requireNonNull(layout);
62         this.identifier = "SHARDED-DOM-" + COUNTER.getAndIncrement();
63         LOG.debug("Created new transaction {}", identifier);
64     }
65
66     private DOMDataTreeShardWriteTransaction lookup(final DOMDataTreeIdentifier prefix) {
67         final DOMDataTreeShardWriteTransaction fast = transactions.get(prefix);
68         if (fast != null) {
69             return fast;
70         }
71
72         LOG.debug("Prefix {} not found in available subtrees {}, fallback to slow path", prefix, transactions.keySet());
73         for (final Entry<DOMDataTreeIdentifier, DOMDataTreeShardWriteTransaction> e : transactions.entrySet()) {
74             if (e.getKey().contains(prefix)) {
75                 return e.getValue();
76             }
77         }
78
79         return null;
80     }
81
82     @Override
83     public String getIdentifier() {
84         return identifier;
85     }
86
87     @Override
88     public synchronized boolean cancel() {
89         if (closed) {
90             return false;
91         }
92
93         LOG.debug("Cancelling transaction {}", identifier);
94         if (openCursor != null) {
95             openCursor.close();
96         }
97         for (final DOMDataTreeShardWriteTransaction tx : transactions.values()) {
98             tx.close();
99         }
100
101         closed = true;
102         producer.cancelTransaction(this);
103         return true;
104     }
105
106     @Override
107     public synchronized DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
108         Preconditions.checkState(!closed, "Transaction is closed already");
109         Preconditions.checkState(openCursor == null, "There is still a cursor open");
110         Preconditions.checkArgument(!producer.isDelegatedToChild(prefix), "Path %s is delegated to child producer.",
111             prefix);
112
113         final DOMDataTreeShardWriteTransaction lookup = lookup(prefix);
114         Preconditions.checkArgument(lookup != null, "Path %s is not accessible from transaction %s", prefix, this);
115
116         openCursor = new DelegatingCursor(lookup.createCursor(prefix), prefix);
117         return openCursor;
118     }
119
120     @Override
121     public synchronized FluentFuture<? extends @NonNull CommitInfo> commit() {
122         Preconditions.checkState(!closed, "Transaction %s is already closed", identifier);
123         Preconditions.checkState(openCursor == null, "Cannot submit transaction while there is a cursor open");
124
125         producer.transactionSubmitted(this);
126         return future;
127     }
128
129     void doSubmit(final Consumer<ShardedDOMDataTreeWriteTransaction> success,
130             final BiConsumer<ShardedDOMDataTreeWriteTransaction, Throwable> failure) {
131         LOG.debug("Readying tx {}", identifier);
132
133         final ListenableFuture<?> internalFuture;
134         switch (transactions.size()) {
135             case 0:
136                 success.accept(this);
137                 return;
138             case 1: {
139                 final DOMDataTreeShardWriteTransaction tx = transactions.values().iterator().next();
140                 tx.ready();
141                 internalFuture = tx.submit();
142                 break;
143             }
144             default:
145                 internalFuture = Futures.allAsList(transactions.values().stream().map(tx -> {
146                     tx.ready();
147                     return tx.submit();
148                 }).collect(Collectors.toList()));
149         }
150
151         Futures.addCallback(internalFuture, new FutureCallback<Object>() {
152             @Override
153             public void onSuccess(final Object result) {
154                 success.accept(ShardedDOMDataTreeWriteTransaction.this);
155             }
156
157             @Override
158             public void onFailure(final Throwable exp) {
159                 failure.accept(ShardedDOMDataTreeWriteTransaction.this, exp);
160             }
161         }, MoreExecutors.directExecutor());
162     }
163
164     void onTransactionSuccess(final CommitInfo commitInfo) {
165         future.set(commitInfo);
166     }
167
168     void onTransactionFailure(final Throwable throwable) {
169         future.setException(throwable);
170     }
171
172     synchronized void cursorClosed() {
173         openCursor = null;
174     }
175
176     private class DelegatingCursor implements DOMDataTreeWriteCursor {
177         private final Deque<PathArgument> currentArgs = new ArrayDeque<>();
178         private final DOMDataTreeWriteCursor delegate;
179         private final DOMDataTreeIdentifier rootPosition;
180
181         DelegatingCursor(final DOMDataTreeWriteCursor delegate, final DOMDataTreeIdentifier rootPosition) {
182             this.delegate = requireNonNull(delegate);
183             this.rootPosition = requireNonNull(rootPosition);
184             currentArgs.addAll(rootPosition.getRootIdentifier().getPathArguments());
185         }
186
187         @Override
188         public void enter(final PathArgument child) {
189             checkAvailable(child);
190             delegate.enter(child);
191             currentArgs.push(child);
192         }
193
194         @Override
195         public void enter(final PathArgument... path) {
196             for (final PathArgument pathArgument : path) {
197                 enter(pathArgument);
198             }
199         }
200
201         @Override
202         public void enter(final Iterable<PathArgument> path) {
203             for (final PathArgument pathArgument : path) {
204                 enter(pathArgument);
205             }
206         }
207
208         @Override
209         public void exit() {
210             delegate.exit();
211             currentArgs.pop();
212         }
213
214         @Override
215         public void exit(final int depth) {
216             delegate.exit(depth);
217             for (int i = 0; i < depth; i++) {
218                 currentArgs.pop();
219             }
220         }
221
222         @Override
223         public void close() {
224             int depthEntered = currentArgs.size() - rootPosition.getRootIdentifier().getPathArguments().size();
225             if (depthEntered > 0) {
226                 // clean up existing modification cursor in case this tx will be reused for batching
227                 delegate.exit(depthEntered);
228             }
229
230             delegate.close();
231             cursorClosed();
232         }
233
234         @Override
235         public void delete(final PathArgument child) {
236             checkAvailable(child);
237             delegate.delete(child);
238         }
239
240         @Override
241         public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
242             checkAvailable(child);
243             delegate.merge(child, data);
244         }
245
246         @Override
247         public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
248             checkAvailable(child);
249             delegate.write(child, data);
250         }
251
252         void checkAvailable(final PathArgument child) {
253             layout.checkAvailable(currentArgs, child);
254         }
255     }
256
257     ProducerLayout getLayout() {
258         return layout;
259     }
260 }