Switch to Objects.requireNonNull
[mdsal.git] / dom / mdsal-dom-broker / src / main / java / org / opendaylight / mdsal / dom / broker / ShardedDOMDataTreeWriteTransaction.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.mdsal.dom.broker;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableMap;
14 import com.google.common.util.concurrent.FluentFuture;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayDeque;
21 import java.util.Deque;
22 import java.util.Map;
23 import java.util.Map.Entry;
24 import java.util.concurrent.atomic.AtomicLong;
25 import java.util.function.BiConsumer;
26 import java.util.function.Consumer;
27 import java.util.stream.Collectors;
28 import javax.annotation.concurrent.GuardedBy;
29 import javax.annotation.concurrent.NotThreadSafe;
30 import org.eclipse.jdt.annotation.NonNull;
31 import org.opendaylight.mdsal.common.api.CommitInfo;
32 import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
33 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
34 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
35 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
36 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
37 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 @NotThreadSafe
42 final class ShardedDOMDataTreeWriteTransaction implements DOMDataTreeCursorAwareTransaction {
43     private static final Logger LOG = LoggerFactory.getLogger(ShardedDOMDataTreeWriteTransaction.class);
44     private static final AtomicLong COUNTER = new AtomicLong();
45
46     private final Map<DOMDataTreeIdentifier, DOMDataTreeShardWriteTransaction> transactions;
47     private final ShardedDOMDataTreeProducer producer;
48     private final ProducerLayout layout;
49     private final String identifier;
50
51     private final SettableFuture<CommitInfo> future = SettableFuture.create();
52
53     @GuardedBy("this")
54     private boolean closed =  false;
55
56     @GuardedBy("this")
57     private DOMDataTreeWriteCursor openCursor;
58
59     ShardedDOMDataTreeWriteTransaction(final ShardedDOMDataTreeProducer producer,
60         final Map<DOMDataTreeIdentifier, DOMDataTreeShardWriteTransaction> transactions, final ProducerLayout layout) {
61         this.producer = requireNonNull(producer);
62         this.transactions = ImmutableMap.copyOf(transactions);
63         this.layout = requireNonNull(layout);
64         this.identifier = "SHARDED-DOM-" + COUNTER.getAndIncrement();
65         LOG.debug("Created new transaction {}", identifier);
66     }
67
68     private DOMDataTreeShardWriteTransaction lookup(final DOMDataTreeIdentifier prefix) {
69         final DOMDataTreeShardWriteTransaction fast = transactions.get(prefix);
70         if (fast != null) {
71             return fast;
72         }
73
74         LOG.debug("Prefix {} not found in available subtrees {}, fallback to slow path", prefix, transactions.keySet());
75         for (final Entry<DOMDataTreeIdentifier, DOMDataTreeShardWriteTransaction> e : transactions.entrySet()) {
76             if (e.getKey().contains(prefix)) {
77                 return e.getValue();
78             }
79         }
80
81         return null;
82     }
83
84     @Override
85     public String getIdentifier() {
86         return identifier;
87     }
88
89     @Override
90     public synchronized boolean cancel() {
91         if (closed) {
92             return false;
93         }
94
95         LOG.debug("Cancelling transaction {}", identifier);
96         if (openCursor != null) {
97             openCursor.close();
98         }
99         for (final DOMDataTreeShardWriteTransaction tx : transactions.values()) {
100             tx.close();
101         }
102
103         closed = true;
104         producer.cancelTransaction(this);
105         return true;
106     }
107
108     @Override
109     public synchronized DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
110         Preconditions.checkState(!closed, "Transaction is closed already");
111         Preconditions.checkState(openCursor == null, "There is still a cursor open");
112         Preconditions.checkArgument(!producer.isDelegatedToChild(prefix), "Path %s is delegated to child producer.",
113             prefix);
114
115         final DOMDataTreeShardWriteTransaction lookup = lookup(prefix);
116         Preconditions.checkArgument(lookup != null, "Path %s is not accessible from transaction %s", prefix, this);
117
118         openCursor = new DelegatingCursor(lookup.createCursor(prefix), prefix);
119         return openCursor;
120     }
121
122     @Override
123     public synchronized FluentFuture<? extends @NonNull CommitInfo> commit() {
124         Preconditions.checkState(!closed, "Transaction %s is already closed", identifier);
125         Preconditions.checkState(openCursor == null, "Cannot submit transaction while there is a cursor open");
126
127         producer.transactionSubmitted(this);
128         return future;
129     }
130
131     void doSubmit(final Consumer<ShardedDOMDataTreeWriteTransaction> success,
132             final BiConsumer<ShardedDOMDataTreeWriteTransaction, Throwable> failure) {
133         LOG.debug("Readying tx {}", identifier);
134
135         final ListenableFuture<?> internalFuture;
136         switch (transactions.size()) {
137             case 0:
138                 success.accept(this);
139                 return;
140             case 1: {
141                 final DOMDataTreeShardWriteTransaction tx = transactions.values().iterator().next();
142                 tx.ready();
143                 internalFuture = tx.submit();
144                 break;
145             }
146             default:
147                 internalFuture = Futures.allAsList(transactions.values().stream().map(tx -> {
148                     tx.ready();
149                     return tx.submit();
150                 }).collect(Collectors.toList()));
151         }
152
153         Futures.addCallback(internalFuture, new FutureCallback<Object>() {
154             @Override
155             public void onSuccess(final Object result) {
156                 success.accept(ShardedDOMDataTreeWriteTransaction.this);
157             }
158
159             @Override
160             public void onFailure(final Throwable exp) {
161                 failure.accept(ShardedDOMDataTreeWriteTransaction.this, exp);
162             }
163         }, MoreExecutors.directExecutor());
164     }
165
166     void onTransactionSuccess(final CommitInfo commitInfo) {
167         future.set(commitInfo);
168     }
169
170     void onTransactionFailure(final Throwable throwable) {
171         future.setException(throwable);
172     }
173
174     synchronized void cursorClosed() {
175         openCursor = null;
176     }
177
178     private class DelegatingCursor implements DOMDataTreeWriteCursor {
179         private final Deque<PathArgument> currentArgs = new ArrayDeque<>();
180         private final DOMDataTreeWriteCursor delegate;
181         private final DOMDataTreeIdentifier rootPosition;
182
183         DelegatingCursor(final DOMDataTreeWriteCursor delegate, final DOMDataTreeIdentifier rootPosition) {
184             this.delegate = requireNonNull(delegate);
185             this.rootPosition = requireNonNull(rootPosition);
186             currentArgs.addAll(rootPosition.getRootIdentifier().getPathArguments());
187         }
188
189         @Override
190         public void enter(final PathArgument child) {
191             checkAvailable(child);
192             delegate.enter(child);
193             currentArgs.push(child);
194         }
195
196         @Override
197         public void enter(final PathArgument... path) {
198             for (final PathArgument pathArgument : path) {
199                 enter(pathArgument);
200             }
201         }
202
203         @Override
204         public void enter(final Iterable<PathArgument> path) {
205             for (final PathArgument pathArgument : path) {
206                 enter(pathArgument);
207             }
208         }
209
210         @Override
211         public void exit() {
212             delegate.exit();
213             currentArgs.pop();
214         }
215
216         @Override
217         public void exit(final int depth) {
218             delegate.exit(depth);
219             for (int i = 0; i < depth; i++) {
220                 currentArgs.pop();
221             }
222         }
223
224         @Override
225         public void close() {
226             int depthEntered = currentArgs.size() - rootPosition.getRootIdentifier().getPathArguments().size();
227             if (depthEntered > 0) {
228                 // clean up existing modification cursor in case this tx will be reused for batching
229                 delegate.exit(depthEntered);
230             }
231
232             delegate.close();
233             cursorClosed();
234         }
235
236         @Override
237         public void delete(final PathArgument child) {
238             checkAvailable(child);
239             delegate.delete(child);
240         }
241
242         @Override
243         public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
244             checkAvailable(child);
245             delegate.merge(child, data);
246         }
247
248         @Override
249         public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
250             checkAvailable(child);
251             delegate.write(child, data);
252         }
253
254         void checkAvailable(final PathArgument child) {
255             layout.checkAvailable(currentArgs, child);
256         }
257     }
258
259     ProducerLayout getLayout() {
260         return layout;
261     }
262 }