2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.AsyncFunction;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.MoreExecutors;
18 import com.google.common.util.concurrent.SettableFuture;
19 import java.util.ArrayList;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Map.Entry;
23 import java.util.stream.Collectors;
24 import javax.annotation.Nonnull;
25 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
27 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
28 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
29 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
30 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
31 import org.opendaylight.mdsal.dom.store.inmemory.ForeignShardThreePhaseCommitCohort;
32 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
37 * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into
38 * {@link ClientTransaction} calls.
40 class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
42 private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
44 private final DOMDataTreeIdentifier shardRoot;
45 private final Collection<DOMDataTreeIdentifier> prefixes;
46 private final DistributedShardModification modification;
47 private ClientTransaction currentTx;
48 private final List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
50 private DOMDataTreeWriteCursor cursor = null;
52 ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot,
53 final Collection<DOMDataTreeIdentifier> prefixes,
54 final DistributedShardModification modification) {
55 this.shardRoot = Preconditions.checkNotNull(shardRoot);
56 this.prefixes = Preconditions.checkNotNull(prefixes);
57 this.modification = Preconditions.checkNotNull(modification);
60 private DOMDataTreeWriteCursor getCursor() {
62 cursor = new DistributedShardModificationCursor(modification, this);
69 public DOMDataTreeWriteCursor createCursor(@Nonnull final DOMDataTreeIdentifier prefix) {
70 checkAvailable(prefix);
71 final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
72 final DOMDataTreeWriteCursor ret = getCursor();
73 ret.enter(relativePath.getPathArguments());
79 modification.cursorClosed();
82 private void checkAvailable(final DOMDataTreeIdentifier prefix) {
83 for (final DOMDataTreeIdentifier p : prefixes) {
84 if (p.contains(prefix)) {
88 throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. "
89 + "Available prefixes: " + prefixes);
92 private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
93 final Optional<YangInstanceIdentifier> relative =
94 path.relativeTo(modification.getPrefix().getRootIdentifier());
95 Preconditions.checkArgument(relative.isPresent());
96 return relative.get();
100 public void ready() {
101 LOG.debug("Readying transaction for shard {}", shardRoot);
103 Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction.");
105 cohorts.add(modification.seal());
106 for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry
107 : modification.getChildShards().entrySet()) {
108 cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
113 public void close() {
114 cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort);
117 if (currentTx != null) {
124 public ListenableFuture<Void> submit() {
125 LOG.debug("Submitting transaction for shard {}", shardRoot);
127 Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
129 final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
130 final AsyncFunction<Void, Void> prepareFunction = input -> commit();
132 // transform validate into prepare
133 final ListenableFuture<Void> prepareFuture = Futures.transformAsync(validate(), validateFunction,
134 MoreExecutors.directExecutor());
135 // transform prepare into commit and return as submit result
136 return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor());
140 public ListenableFuture<Boolean> validate() {
141 LOG.debug("Validating transaction for shard {}", shardRoot);
143 Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
144 final List<ListenableFuture<Boolean>> futures =
145 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
146 final SettableFuture<Boolean> ret = SettableFuture.create();
148 Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
150 public void onSuccess(final List<Boolean> result) {
155 public void onFailure(final Throwable throwable) {
156 ret.setException(throwable);
158 }, MoreExecutors.directExecutor());
164 public ListenableFuture<Void> prepare() {
165 LOG.debug("Preparing transaction for shard {}", shardRoot);
167 Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
168 final List<ListenableFuture<Void>> futures =
169 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
170 final SettableFuture<Void> ret = SettableFuture.create();
172 Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
174 public void onSuccess(final List<Void> result) {
179 public void onFailure(final Throwable throwable) {
180 ret.setException(throwable);
182 }, MoreExecutors.directExecutor());
188 public ListenableFuture<Void> commit() {
189 LOG.debug("Committing transaction for shard {}", shardRoot);
191 Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
192 final List<ListenableFuture<Void>> futures =
193 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
194 final SettableFuture<Void> ret = SettableFuture.create();
196 Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
198 public void onSuccess(final List<Void> result) {
203 public void onFailure(final Throwable throwable) {
204 ret.setException(throwable);
206 }, MoreExecutors.directExecutor());