Migrate nullness annotations
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.sharding;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.AsyncFunction;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Map.Entry;
23 import java.util.Optional;
24 import java.util.stream.Collectors;
25 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
28 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
29 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
30 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
31 import org.opendaylight.mdsal.dom.store.inmemory.ForeignShardThreePhaseCommitCohort;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into
38  * {@link ClientTransaction} calls.
39  */
40 class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
41
42     private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
43
44     private final DOMDataTreeIdentifier shardRoot;
45     private final Collection<DOMDataTreeIdentifier> prefixes;
46     private final DistributedShardModification modification;
47     private ClientTransaction currentTx;
48     private final List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
49
50     private DOMDataTreeWriteCursor cursor = null;
51
52     ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot,
53                           final Collection<DOMDataTreeIdentifier> prefixes,
54                           final DistributedShardModification modification) {
55         this.shardRoot = requireNonNull(shardRoot);
56         this.prefixes = requireNonNull(prefixes);
57         this.modification = requireNonNull(modification);
58     }
59
60     private DOMDataTreeWriteCursor getCursor() {
61         if (cursor == null) {
62             cursor = new DistributedShardModificationCursor(modification, this);
63         }
64         return cursor;
65     }
66
67     @Override
68     public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
69         checkAvailable(prefix);
70         final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
71         final DOMDataTreeWriteCursor ret = getCursor();
72         ret.enter(relativePath.getPathArguments());
73         return ret;
74     }
75
76     void cursorClosed() {
77         cursor = null;
78         modification.cursorClosed();
79     }
80
81     private void checkAvailable(final DOMDataTreeIdentifier prefix) {
82         for (final DOMDataTreeIdentifier p : prefixes) {
83             if (p.contains(prefix)) {
84                 return;
85             }
86         }
87         throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. "
88                 + "Available prefixes: " + prefixes);
89     }
90
91     private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
92         final Optional<YangInstanceIdentifier> relative =
93                 path.relativeTo(modification.getPrefix().getRootIdentifier());
94         Preconditions.checkArgument(relative.isPresent());
95         return relative.get();
96     }
97
98     @Override
99     public void ready() {
100         LOG.debug("Readying transaction for shard {}", shardRoot);
101
102         Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction.");
103
104         cohorts.add(modification.seal());
105         for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry
106                 : modification.getChildShards().entrySet()) {
107             cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
108         }
109     }
110
111     @Override
112     public void close() {
113         cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort);
114         cohorts.clear();
115
116         if (currentTx != null) {
117             currentTx.abort();
118             currentTx = null;
119         }
120     }
121
122     @Override
123     public ListenableFuture<Void> submit() {
124         LOG.debug("Submitting transaction for shard {}", shardRoot);
125
126         checkTransactionReadied();
127
128         final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
129         final AsyncFunction<Void, Void> prepareFunction = input -> commit();
130
131         // transform validate into prepare
132         final ListenableFuture<Void> prepareFuture = Futures.transformAsync(validate(), validateFunction,
133             MoreExecutors.directExecutor());
134         // transform prepare into commit and return as submit result
135         return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor());
136     }
137
138     private void checkTransactionReadied() {
139         Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
140     }
141
142     @Override
143     public ListenableFuture<Boolean> validate() {
144         LOG.debug("Validating transaction for shard {}", shardRoot);
145
146         checkTransactionReadied();
147         final List<ListenableFuture<Boolean>> futures =
148                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
149         final SettableFuture<Boolean> ret = SettableFuture.create();
150
151         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
152             @Override
153             public void onSuccess(final List<Boolean> result) {
154                 ret.set(true);
155             }
156
157             @Override
158             public void onFailure(final Throwable throwable) {
159                 ret.setException(throwable);
160             }
161         }, MoreExecutors.directExecutor());
162
163         return ret;
164     }
165
166     @Override
167     public ListenableFuture<Void> prepare() {
168         LOG.debug("Preparing transaction for shard {}", shardRoot);
169
170         checkTransactionReadied();
171         final List<ListenableFuture<Void>> futures =
172                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
173         final SettableFuture<Void> ret = SettableFuture.create();
174
175         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
176             @Override
177             public void onSuccess(final List<Void> result) {
178                 ret.set(null);
179             }
180
181             @Override
182             public void onFailure(final Throwable throwable) {
183                 ret.setException(throwable);
184             }
185         }, MoreExecutors.directExecutor());
186
187         return ret;
188     }
189
190     @Override
191     public ListenableFuture<Void> commit() {
192         LOG.debug("Committing transaction for shard {}", shardRoot);
193
194         checkTransactionReadied();
195         final List<ListenableFuture<Void>> futures =
196                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
197         final SettableFuture<Void> ret = SettableFuture.create();
198
199         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
200             @Override
201             public void onSuccess(final List<Void> result) {
202                 ret.set(null);
203             }
204
205             @Override
206             public void onFailure(final Throwable throwable) {
207                 ret.setException(throwable);
208             }
209         }, MoreExecutors.directExecutor());
210
211         return ret;
212     }
213 }