2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.sharding;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
14 import com.google.common.util.concurrent.AsyncFunction;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.Map.Entry;
24 import java.util.Optional;
25 import java.util.stream.Collectors;
26 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
29 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
30 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
31 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardThreePhaseCommitCohort;
32 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into
39 * {@link ClientTransaction} calls.
41 @Deprecated(forRemoval = true)
42 class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
44 private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
46 private final DOMDataTreeIdentifier shardRoot;
47 private final Collection<DOMDataTreeIdentifier> prefixes;
48 private final DistributedShardModification modification;
49 private ClientTransaction currentTx;
50 private final List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
52 private DOMDataTreeWriteCursor cursor = null;
54 ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot,
55 final Collection<DOMDataTreeIdentifier> prefixes,
56 final DistributedShardModification modification) {
57 this.shardRoot = requireNonNull(shardRoot);
58 this.prefixes = requireNonNull(prefixes);
59 this.modification = requireNonNull(modification);
62 private DOMDataTreeWriteCursor getCursor() {
64 cursor = new DistributedShardModificationCursor(modification, this);
70 public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
71 checkAvailable(prefix);
72 final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
73 final DOMDataTreeWriteCursor ret = getCursor();
74 ret.enter(relativePath.getPathArguments());
80 modification.cursorClosed();
83 private void checkAvailable(final DOMDataTreeIdentifier prefix) {
84 for (final DOMDataTreeIdentifier p : prefixes) {
85 if (p.contains(prefix)) {
89 throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. "
90 + "Available prefixes: " + prefixes);
93 private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
94 final Optional<YangInstanceIdentifier> relative =
95 path.relativeTo(modification.getPrefix().getRootIdentifier());
96 checkArgument(relative.isPresent());
97 return relative.get();
101 public void ready() {
102 LOG.debug("Readying transaction for shard {}", shardRoot);
104 requireNonNull(modification, "Attempting to ready an empty transaction.");
106 cohorts.add(modification.seal());
107 for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry
108 : modification.getChildShards().entrySet()) {
109 cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
114 public void close() {
115 cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort);
118 if (currentTx != null) {
125 public ListenableFuture<Void> submit() {
126 LOG.debug("Submitting transaction for shard {}", shardRoot);
128 checkTransactionReadied();
130 final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
131 final AsyncFunction<Void, Void> prepareFunction = input -> commit();
133 // transform validate into prepare
134 final ListenableFuture<Void> prepareFuture = Futures.transformAsync(validate(), validateFunction,
135 MoreExecutors.directExecutor());
136 // transform prepare into commit and return as submit result
137 return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor());
140 private void checkTransactionReadied() {
141 checkState(!cohorts.isEmpty(), "Transaction not readied yet");
145 public ListenableFuture<Boolean> validate() {
146 LOG.debug("Validating transaction for shard {}", shardRoot);
148 checkTransactionReadied();
149 final List<ListenableFuture<Boolean>> futures =
150 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
151 final SettableFuture<Boolean> ret = SettableFuture.create();
153 Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
155 public void onSuccess(final List<Boolean> result) {
160 public void onFailure(final Throwable throwable) {
161 ret.setException(throwable);
163 }, MoreExecutors.directExecutor());
169 public ListenableFuture<Void> prepare() {
170 LOG.debug("Preparing transaction for shard {}", shardRoot);
172 checkTransactionReadied();
173 final List<ListenableFuture<Void>> futures =
174 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
175 final SettableFuture<Void> ret = SettableFuture.create();
177 Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
179 public void onSuccess(final List<Void> result) {
184 public void onFailure(final Throwable throwable) {
185 ret.setException(throwable);
187 }, MoreExecutors.directExecutor());
193 public ListenableFuture<Void> commit() {
194 LOG.debug("Committing transaction for shard {}", shardRoot);
196 checkTransactionReadied();
197 final List<ListenableFuture<Void>> futures =
198 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
199 final SettableFuture<Void> ret = SettableFuture.create();
201 Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
203 public void onSuccess(final List<Void> result) {
208 public void onFailure(final Throwable throwable) {
209 ret.setException(throwable);
211 }, MoreExecutors.directExecutor());