Bump versions to 4.0.0-SNAPSHOT
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.sharding;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.util.concurrent.AsyncFunction;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.Map.Entry;
24 import java.util.Optional;
25 import java.util.stream.Collectors;
26 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
29 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
30 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
31 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardThreePhaseCommitCohort;
32 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into
39  * {@link ClientTransaction} calls.
40  */
41 @Deprecated(forRemoval = true)
42 class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
43
44     private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
45
46     private final DOMDataTreeIdentifier shardRoot;
47     private final Collection<DOMDataTreeIdentifier> prefixes;
48     private final DistributedShardModification modification;
49     private ClientTransaction currentTx;
50     private final List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
51
52     private DOMDataTreeWriteCursor cursor = null;
53
54     ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot,
55                           final Collection<DOMDataTreeIdentifier> prefixes,
56                           final DistributedShardModification modification) {
57         this.shardRoot = requireNonNull(shardRoot);
58         this.prefixes = requireNonNull(prefixes);
59         this.modification = requireNonNull(modification);
60     }
61
62     private DOMDataTreeWriteCursor getCursor() {
63         if (cursor == null) {
64             cursor = new DistributedShardModificationCursor(modification, this);
65         }
66         return cursor;
67     }
68
69     @Override
70     public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
71         checkAvailable(prefix);
72         final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
73         final DOMDataTreeWriteCursor ret = getCursor();
74         ret.enter(relativePath.getPathArguments());
75         return ret;
76     }
77
78     void cursorClosed() {
79         cursor = null;
80         modification.cursorClosed();
81     }
82
83     private void checkAvailable(final DOMDataTreeIdentifier prefix) {
84         for (final DOMDataTreeIdentifier p : prefixes) {
85             if (p.contains(prefix)) {
86                 return;
87             }
88         }
89         throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. "
90                 + "Available prefixes: " + prefixes);
91     }
92
93     private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
94         final Optional<YangInstanceIdentifier> relative =
95                 path.relativeTo(modification.getPrefix().getRootIdentifier());
96         checkArgument(relative.isPresent());
97         return relative.get();
98     }
99
100     @Override
101     public void ready() {
102         LOG.debug("Readying transaction for shard {}", shardRoot);
103
104         requireNonNull(modification, "Attempting to ready an empty transaction.");
105
106         cohorts.add(modification.seal());
107         for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry
108                 : modification.getChildShards().entrySet()) {
109             cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
110         }
111     }
112
113     @Override
114     public void close() {
115         cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort);
116         cohorts.clear();
117
118         if (currentTx != null) {
119             currentTx.abort();
120             currentTx = null;
121         }
122     }
123
124     @Override
125     public ListenableFuture<Void> submit() {
126         LOG.debug("Submitting transaction for shard {}", shardRoot);
127
128         checkTransactionReadied();
129
130         final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
131         final AsyncFunction<Void, Void> prepareFunction = input -> commit();
132
133         // transform validate into prepare
134         final ListenableFuture<Void> prepareFuture = Futures.transformAsync(validate(), validateFunction,
135             MoreExecutors.directExecutor());
136         // transform prepare into commit and return as submit result
137         return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor());
138     }
139
140     private void checkTransactionReadied() {
141         checkState(!cohorts.isEmpty(), "Transaction not readied yet");
142     }
143
144     @Override
145     public ListenableFuture<Boolean> validate() {
146         LOG.debug("Validating transaction for shard {}", shardRoot);
147
148         checkTransactionReadied();
149         final List<ListenableFuture<Boolean>> futures =
150                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
151         final SettableFuture<Boolean> ret = SettableFuture.create();
152
153         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
154             @Override
155             public void onSuccess(final List<Boolean> result) {
156                 ret.set(true);
157             }
158
159             @Override
160             public void onFailure(final Throwable throwable) {
161                 ret.setException(throwable);
162             }
163         }, MoreExecutors.directExecutor());
164
165         return ret;
166     }
167
168     @Override
169     public ListenableFuture<Void> prepare() {
170         LOG.debug("Preparing transaction for shard {}", shardRoot);
171
172         checkTransactionReadied();
173         final List<ListenableFuture<Void>> futures =
174                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
175         final SettableFuture<Void> ret = SettableFuture.create();
176
177         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
178             @Override
179             public void onSuccess(final List<Void> result) {
180                 ret.set(null);
181             }
182
183             @Override
184             public void onFailure(final Throwable throwable) {
185                 ret.setException(throwable);
186             }
187         }, MoreExecutors.directExecutor());
188
189         return ret;
190     }
191
192     @Override
193     public ListenableFuture<Void> commit() {
194         LOG.debug("Committing transaction for shard {}", shardRoot);
195
196         checkTransactionReadied();
197         final List<ListenableFuture<Void>> futures =
198                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
199         final SettableFuture<Void> ret = SettableFuture.create();
200
201         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
202             @Override
203             public void onSuccess(final List<Void> result) {
204                 ret.set(null);
205             }
206
207             @Override
208             public void onFailure(final Throwable throwable) {
209                 ret.setException(throwable);
210             }
211         }, MoreExecutors.directExecutor());
212
213         return ret;
214     }
215 }