Bump to odlparent 2.0.0
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.AsyncFunction;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Map.Entry;
23 import java.util.stream.Collectors;
24 import javax.annotation.Nonnull;
25 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
28 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
29 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
30 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
31 import org.opendaylight.mdsal.dom.store.inmemory.ForeignShardThreePhaseCommitCohort;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37  * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into
38  * {@link ClientTransaction} calls.
39  */
40 class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
41
42     private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
43
44     private final DOMDataTreeIdentifier shardRoot;
45     private final Collection<DOMDataTreeIdentifier> prefixes;
46     private final DistributedShardModification modification;
47     private ClientTransaction currentTx;
48     private final List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
49
50     private DOMDataTreeWriteCursor cursor = null;
51
52     ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot,
53                           final Collection<DOMDataTreeIdentifier> prefixes,
54                           final DistributedShardModification modification) {
55         this.shardRoot = Preconditions.checkNotNull(shardRoot);
56         this.prefixes = Preconditions.checkNotNull(prefixes);
57         this.modification = Preconditions.checkNotNull(modification);
58     }
59
60     private DOMDataTreeWriteCursor getCursor() {
61         if (cursor == null) {
62             cursor = new DistributedShardModificationCursor(modification, this);
63         }
64         return cursor;
65     }
66
67     @Nonnull
68     @Override
69     public DOMDataTreeWriteCursor createCursor(@Nonnull final DOMDataTreeIdentifier prefix) {
70         checkAvailable(prefix);
71         final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
72         final DOMDataTreeWriteCursor ret = getCursor();
73         ret.enter(relativePath.getPathArguments());
74         return ret;
75     }
76
77     void cursorClosed() {
78         cursor = null;
79         modification.cursorClosed();
80     }
81
82     private void checkAvailable(final DOMDataTreeIdentifier prefix) {
83         for (final DOMDataTreeIdentifier p : prefixes) {
84             if (p.contains(prefix)) {
85                 return;
86             }
87         }
88         throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. "
89                 + "Available prefixes: " + prefixes);
90     }
91
92     private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
93         final Optional<YangInstanceIdentifier> relative =
94                 path.relativeTo(modification.getPrefix().getRootIdentifier());
95         Preconditions.checkArgument(relative.isPresent());
96         return relative.get();
97     }
98
99     @Override
100     public void ready() {
101         LOG.debug("Readying transaction for shard {}", shardRoot);
102
103         Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction.");
104
105         cohorts.add(modification.seal());
106         for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry
107                 : modification.getChildShards().entrySet()) {
108             cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
109         }
110     }
111
112     @Override
113     public void close() {
114         cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort);
115         cohorts.clear();
116
117         if (currentTx != null) {
118             currentTx.abort();
119             currentTx = null;
120         }
121     }
122
123     @Override
124     public ListenableFuture<Void> submit() {
125         LOG.debug("Submitting transaction for shard {}", shardRoot);
126
127         Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
128
129         final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
130         final AsyncFunction<Void, Void> prepareFunction = input -> commit();
131
132         // transform validate into prepare
133         final ListenableFuture<Void> prepareFuture = Futures.transformAsync(validate(), validateFunction,
134             MoreExecutors.directExecutor());
135         // transform prepare into commit and return as submit result
136         return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor());
137     }
138
139     @Override
140     public ListenableFuture<Boolean> validate() {
141         LOG.debug("Validating transaction for shard {}", shardRoot);
142
143         Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
144         final List<ListenableFuture<Boolean>> futures =
145                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
146         final SettableFuture<Boolean> ret = SettableFuture.create();
147
148         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
149             @Override
150             public void onSuccess(final List<Boolean> result) {
151                 ret.set(true);
152             }
153
154             @Override
155             public void onFailure(final Throwable throwable) {
156                 ret.setException(throwable);
157             }
158         }, MoreExecutors.directExecutor());
159
160         return ret;
161     }
162
163     @Override
164     public ListenableFuture<Void> prepare() {
165         LOG.debug("Preparing transaction for shard {}", shardRoot);
166
167         Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
168         final List<ListenableFuture<Void>> futures =
169                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
170         final SettableFuture<Void> ret = SettableFuture.create();
171
172         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
173             @Override
174             public void onSuccess(final List<Void> result) {
175                 ret.set(null);
176             }
177
178             @Override
179             public void onFailure(final Throwable throwable) {
180                 ret.setException(throwable);
181             }
182         }, MoreExecutors.directExecutor());
183
184         return ret;
185     }
186
187     @Override
188     public ListenableFuture<Void> commit() {
189         LOG.debug("Committing transaction for shard {}", shardRoot);
190
191         Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
192         final List<ListenableFuture<Void>> futures =
193                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
194         final SettableFuture<Void> ret = SettableFuture.create();
195
196         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
197             @Override
198             public void onSuccess(final List<Void> result) {
199                 ret.set(null);
200             }
201
202             @Override
203             public void onFailure(final Throwable throwable) {
204                 ret.setException(throwable);
205             }
206         }, MoreExecutors.directExecutor());
207
208         return ret;
209     }
210 }