Bump odlparent/yangtools/mdsal
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.sharding;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
13
14 import com.google.common.util.concurrent.AsyncFunction;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.Map.Entry;
24 import java.util.Optional;
25 import java.util.stream.Collectors;
26 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
29 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
30 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
31 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardThreePhaseCommitCohort;
32 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 /**
38  * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into
39  * {@link ClientTransaction} calls.
40  */
41 class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
42
43     private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
44
45     private final DOMDataTreeIdentifier shardRoot;
46     private final Collection<DOMDataTreeIdentifier> prefixes;
47     private final DistributedShardModification modification;
48     private ClientTransaction currentTx;
49     private final List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
50
51     private DOMDataTreeWriteCursor cursor = null;
52
53     ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot,
54                           final Collection<DOMDataTreeIdentifier> prefixes,
55                           final DistributedShardModification modification) {
56         this.shardRoot = requireNonNull(shardRoot);
57         this.prefixes = requireNonNull(prefixes);
58         this.modification = requireNonNull(modification);
59     }
60
61     private DOMDataTreeWriteCursor getCursor() {
62         if (cursor == null) {
63             cursor = new DistributedShardModificationCursor(modification, this);
64         }
65         return cursor;
66     }
67
68     @Override
69     public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
70         checkAvailable(prefix);
71         final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
72         final DOMDataTreeWriteCursor ret = getCursor();
73         ret.enter(relativePath.getPathArguments());
74         return ret;
75     }
76
77     void cursorClosed() {
78         cursor = null;
79         modification.cursorClosed();
80     }
81
82     private void checkAvailable(final DOMDataTreeIdentifier prefix) {
83         for (final DOMDataTreeIdentifier p : prefixes) {
84             if (p.contains(prefix)) {
85                 return;
86             }
87         }
88         throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. "
89                 + "Available prefixes: " + prefixes);
90     }
91
92     private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
93         final Optional<YangInstanceIdentifier> relative =
94                 path.relativeTo(modification.getPrefix().getRootIdentifier());
95         checkArgument(relative.isPresent());
96         return relative.get();
97     }
98
99     @Override
100     public void ready() {
101         LOG.debug("Readying transaction for shard {}", shardRoot);
102
103         requireNonNull(modification, "Attempting to ready an empty transaction.");
104
105         cohorts.add(modification.seal());
106         for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry
107                 : modification.getChildShards().entrySet()) {
108             cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
109         }
110     }
111
112     @Override
113     public void close() {
114         cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort);
115         cohorts.clear();
116
117         if (currentTx != null) {
118             currentTx.abort();
119             currentTx = null;
120         }
121     }
122
123     @Override
124     public ListenableFuture<Void> submit() {
125         LOG.debug("Submitting transaction for shard {}", shardRoot);
126
127         checkTransactionReadied();
128
129         final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
130         final AsyncFunction<Void, Void> prepareFunction = input -> commit();
131
132         // transform validate into prepare
133         final ListenableFuture<Void> prepareFuture = Futures.transformAsync(validate(), validateFunction,
134             MoreExecutors.directExecutor());
135         // transform prepare into commit and return as submit result
136         return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor());
137     }
138
139     private void checkTransactionReadied() {
140         checkState(!cohorts.isEmpty(), "Transaction not readied yet");
141     }
142
143     @Override
144     public ListenableFuture<Boolean> validate() {
145         LOG.debug("Validating transaction for shard {}", shardRoot);
146
147         checkTransactionReadied();
148         final List<ListenableFuture<Boolean>> futures =
149                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
150         final SettableFuture<Boolean> ret = SettableFuture.create();
151
152         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
153             @Override
154             public void onSuccess(final List<Boolean> result) {
155                 ret.set(true);
156             }
157
158             @Override
159             public void onFailure(final Throwable throwable) {
160                 ret.setException(throwable);
161             }
162         }, MoreExecutors.directExecutor());
163
164         return ret;
165     }
166
167     @Override
168     public ListenableFuture<Void> prepare() {
169         LOG.debug("Preparing transaction for shard {}", shardRoot);
170
171         checkTransactionReadied();
172         final List<ListenableFuture<Void>> futures =
173                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
174         final SettableFuture<Void> ret = SettableFuture.create();
175
176         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
177             @Override
178             public void onSuccess(final List<Void> result) {
179                 ret.set(null);
180             }
181
182             @Override
183             public void onFailure(final Throwable throwable) {
184                 ret.setException(throwable);
185             }
186         }, MoreExecutors.directExecutor());
187
188         return ret;
189     }
190
191     @Override
192     public ListenableFuture<Void> commit() {
193         LOG.debug("Committing transaction for shard {}", shardRoot);
194
195         checkTransactionReadied();
196         final List<ListenableFuture<Void>> futures =
197                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
198         final SettableFuture<Void> ret = SettableFuture.create();
199
200         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
201             @Override
202             public void onSuccess(final List<Void> result) {
203                 ret.set(null);
204             }
205
206             @Override
207             public void onFailure(final Throwable throwable) {
208                 ret.setException(throwable);
209             }
210         }, MoreExecutors.directExecutor());
211
212         return ret;
213     }
214 }