2 * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.cluster.sharding;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.AsyncFunction;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Map.Entry;
22 import java.util.stream.Collectors;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
27 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
28 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
29 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
30 import org.opendaylight.mdsal.dom.store.inmemory.ForeignShardThreePhaseCommitCohort;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
36 * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into
37 * {@link ClientTransaction} calls.
39 class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
41 private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
43 private final DOMDataTreeIdentifier shardRoot;
44 private final Collection<DOMDataTreeIdentifier> prefixes;
45 private final DistributedShardModification modification;
46 private ClientTransaction currentTx;
47 private final List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
49 private DOMDataTreeWriteCursor cursor = null;
51 ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot,
52 final Collection<DOMDataTreeIdentifier> prefixes,
53 final DistributedShardModification modification) {
54 this.shardRoot = Preconditions.checkNotNull(shardRoot);
55 this.prefixes = Preconditions.checkNotNull(prefixes);
56 this.modification = Preconditions.checkNotNull(modification);
59 private DOMDataTreeWriteCursor getCursor() {
61 cursor = new DistributedShardModificationCursor(modification, this);
68 public DOMDataTreeWriteCursor createCursor(@Nonnull final DOMDataTreeIdentifier prefix) {
69 checkAvailable(prefix);
70 final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
71 final DOMDataTreeWriteCursor ret = getCursor();
72 ret.enter(relativePath.getPathArguments());
78 modification.cursorClosed();
81 private void checkAvailable(final DOMDataTreeIdentifier prefix) {
82 for (final DOMDataTreeIdentifier p : prefixes) {
83 if (p.contains(prefix)) {
87 throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. "
88 + "Available prefixes: " + prefixes);
91 private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
92 final Optional<YangInstanceIdentifier> relative =
93 path.relativeTo(modification.getPrefix().getRootIdentifier());
94 Preconditions.checkArgument(relative.isPresent());
95 return relative.get();
100 LOG.debug("Readying transaction for shard {}", shardRoot);
102 Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction.");
104 cohorts.add(modification.seal());
105 for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry
106 : modification.getChildShards().entrySet()) {
107 cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
112 public void close() {
113 cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort);
116 if (currentTx != null) {
123 public ListenableFuture<Void> submit() {
124 LOG.debug("Submitting transaction for shard {}", shardRoot);
126 Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
128 final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
129 final AsyncFunction<Void, Void> prepareFunction = input -> commit();
131 // transform validate into prepare
132 final ListenableFuture<Void> prepareFuture = Futures.transform(validate(), validateFunction);
133 // transform prepare into commit and return as submit result
134 return Futures.transform(prepareFuture, prepareFunction);
138 public ListenableFuture<Boolean> validate() {
139 LOG.debug("Validating transaction for shard {}", shardRoot);
141 Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
142 final List<ListenableFuture<Boolean>> futures =
143 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
144 final SettableFuture<Boolean> ret = SettableFuture.create();
146 Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
148 public void onSuccess(final List<Boolean> result) {
153 public void onFailure(final Throwable throwable) {
154 ret.setException(throwable);
162 public ListenableFuture<Void> prepare() {
163 LOG.debug("Preparing transaction for shard {}", shardRoot);
165 Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
166 final List<ListenableFuture<Void>> futures =
167 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
168 final SettableFuture<Void> ret = SettableFuture.create();
170 Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
172 public void onSuccess(final List<Void> result) {
177 public void onFailure(final Throwable throwable) {
178 ret.setException(throwable);
186 public ListenableFuture<Void> commit() {
187 LOG.debug("Committing transaction for shard {}", shardRoot);
189 Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
190 final List<ListenableFuture<Void>> futures =
191 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
192 final SettableFuture<Void> ret = SettableFuture.create();
194 Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
196 public void onSuccess(final List<Void> result) {
201 public void onFailure(final Throwable throwable) {
202 ret.setException(throwable);