2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.sharding;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
14 import com.google.common.util.concurrent.AsyncFunction;
15 import com.google.common.util.concurrent.FutureCallback;
16 import com.google.common.util.concurrent.Futures;
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.MoreExecutors;
19 import com.google.common.util.concurrent.SettableFuture;
20 import java.util.ArrayList;
21 import java.util.Collection;
22 import java.util.List;
23 import java.util.Map.Entry;
24 import java.util.Optional;
25 import java.util.stream.Collectors;
26 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
28 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
29 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
30 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
31 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
32 import org.opendaylight.mdsal.dom.store.inmemory.ForeignShardThreePhaseCommitCohort;
33 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
38 * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into
39 * {@link ClientTransaction} calls.
41 class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
43 private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
45 private final DOMDataTreeIdentifier shardRoot;
46 private final Collection<DOMDataTreeIdentifier> prefixes;
47 private final DistributedShardModification modification;
48 private ClientTransaction currentTx;
49 private final List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
51 private DOMDataTreeWriteCursor cursor = null;
53 ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot,
54 final Collection<DOMDataTreeIdentifier> prefixes,
55 final DistributedShardModification modification) {
56 this.shardRoot = requireNonNull(shardRoot);
57 this.prefixes = requireNonNull(prefixes);
58 this.modification = requireNonNull(modification);
61 private DOMDataTreeWriteCursor getCursor() {
63 cursor = new DistributedShardModificationCursor(modification, this);
69 public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
70 checkAvailable(prefix);
71 final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
72 final DOMDataTreeWriteCursor ret = getCursor();
73 ret.enter(relativePath.getPathArguments());
79 modification.cursorClosed();
82 private void checkAvailable(final DOMDataTreeIdentifier prefix) {
83 for (final DOMDataTreeIdentifier p : prefixes) {
84 if (p.contains(prefix)) {
88 throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. "
89 + "Available prefixes: " + prefixes);
92 private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
93 final Optional<YangInstanceIdentifier> relative =
94 path.relativeTo(modification.getPrefix().getRootIdentifier());
95 checkArgument(relative.isPresent());
96 return relative.get();
100 public void ready() {
101 LOG.debug("Readying transaction for shard {}", shardRoot);
103 requireNonNull(modification, "Attempting to ready an empty transaction.");
105 cohorts.add(modification.seal());
106 for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry
107 : modification.getChildShards().entrySet()) {
108 cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
113 public void close() {
114 cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort);
117 if (currentTx != null) {
124 public ListenableFuture<Void> submit() {
125 LOG.debug("Submitting transaction for shard {}", shardRoot);
127 checkTransactionReadied();
129 final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
130 final AsyncFunction<Void, Void> prepareFunction = input -> commit();
132 // transform validate into prepare
133 final ListenableFuture<Void> prepareFuture = Futures.transformAsync(validate(), validateFunction,
134 MoreExecutors.directExecutor());
135 // transform prepare into commit and return as submit result
136 return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor());
139 private void checkTransactionReadied() {
140 checkState(!cohorts.isEmpty(), "Transaction not readied yet");
144 public ListenableFuture<Boolean> validate() {
145 LOG.debug("Validating transaction for shard {}", shardRoot);
147 checkTransactionReadied();
148 final List<ListenableFuture<Boolean>> futures =
149 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
150 final SettableFuture<Boolean> ret = SettableFuture.create();
152 Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
154 public void onSuccess(final List<Boolean> result) {
159 public void onFailure(final Throwable throwable) {
160 ret.setException(throwable);
162 }, MoreExecutors.directExecutor());
168 public ListenableFuture<Void> prepare() {
169 LOG.debug("Preparing transaction for shard {}", shardRoot);
171 checkTransactionReadied();
172 final List<ListenableFuture<Void>> futures =
173 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
174 final SettableFuture<Void> ret = SettableFuture.create();
176 Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
178 public void onSuccess(final List<Void> result) {
183 public void onFailure(final Throwable throwable) {
184 ret.setException(throwable);
186 }, MoreExecutors.directExecutor());
192 public ListenableFuture<Void> commit() {
193 LOG.debug("Committing transaction for shard {}", shardRoot);
195 checkTransactionReadied();
196 final List<ListenableFuture<Void>> futures =
197 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
198 final SettableFuture<Void> ret = SettableFuture.create();
200 Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
202 public void onSuccess(final List<Void> result) {
207 public void onFailure(final Throwable throwable) {
208 ret.setException(throwable);
210 }, MoreExecutors.directExecutor());