Unit tests for ClientBackedDataStore class
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / sharding / ShardProxyTransaction.java
1 /*
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.cluster.sharding;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.util.concurrent.AsyncFunction;
14 import com.google.common.util.concurrent.FutureCallback;
15 import com.google.common.util.concurrent.Futures;
16 import com.google.common.util.concurrent.ListenableFuture;
17 import com.google.common.util.concurrent.SettableFuture;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.List;
21 import java.util.Map.Entry;
22 import java.util.stream.Collectors;
23 import javax.annotation.Nonnull;
24 import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
25 import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
26 import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
27 import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
28 import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
29 import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
30 import org.opendaylight.mdsal.dom.store.inmemory.ForeignShardThreePhaseCommitCohort;
31 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36  * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into
37  * {@link ClientTransaction} calls.
38  */
39 class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
40
41     private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
42
43     private final DOMDataTreeIdentifier shardRoot;
44     private final Collection<DOMDataTreeIdentifier> prefixes;
45     private final DistributedShardModification modification;
46     private ClientTransaction currentTx;
47     private final List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
48
49     private DOMDataTreeWriteCursor cursor = null;
50
51     ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot,
52                           final Collection<DOMDataTreeIdentifier> prefixes,
53                           final DistributedShardModification modification) {
54         this.shardRoot = Preconditions.checkNotNull(shardRoot);
55         this.prefixes = Preconditions.checkNotNull(prefixes);
56         this.modification = Preconditions.checkNotNull(modification);
57     }
58
59     private DOMDataTreeWriteCursor getCursor() {
60         if (cursor == null) {
61             cursor = new DistributedShardModificationCursor(modification, this);
62         }
63         return cursor;
64     }
65
66     @Nonnull
67     @Override
68     public DOMDataTreeWriteCursor createCursor(@Nonnull final DOMDataTreeIdentifier prefix) {
69         checkAvailable(prefix);
70         final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
71         final DOMDataTreeWriteCursor ret = getCursor();
72         ret.enter(relativePath.getPathArguments());
73         return ret;
74     }
75
76     void cursorClosed() {
77         cursor = null;
78         modification.cursorClosed();
79     }
80
81     private void checkAvailable(final DOMDataTreeIdentifier prefix) {
82         for (final DOMDataTreeIdentifier p : prefixes) {
83             if (p.contains(prefix)) {
84                 return;
85             }
86         }
87         throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. "
88                 + "Available prefixes: " + prefixes);
89     }
90
91     private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
92         final Optional<YangInstanceIdentifier> relative =
93                 path.relativeTo(modification.getPrefix().getRootIdentifier());
94         Preconditions.checkArgument(relative.isPresent());
95         return relative.get();
96     }
97
98     @Override
99     public void ready() {
100         LOG.debug("Readying transaction for shard {}", shardRoot);
101
102         Preconditions.checkNotNull(modification, "Attempting to ready an empty transaction.");
103
104         cohorts.add(modification.seal());
105         for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry
106                 : modification.getChildShards().entrySet()) {
107             cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
108         }
109     }
110
111     @Override
112     public void close() {
113         cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort);
114         cohorts.clear();
115
116         if (currentTx != null) {
117             currentTx.abort();
118             currentTx = null;
119         }
120     }
121
122     @Override
123     public ListenableFuture<Void> submit() {
124         LOG.debug("Submitting transaction for shard {}", shardRoot);
125
126         Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
127
128         final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
129         final AsyncFunction<Void, Void> prepareFunction = input -> commit();
130
131         // transform validate into prepare
132         final ListenableFuture<Void> prepareFuture = Futures.transform(validate(), validateFunction);
133         // transform prepare into commit and return as submit result
134         return Futures.transform(prepareFuture, prepareFunction);
135     }
136
137     @Override
138     public ListenableFuture<Boolean> validate() {
139         LOG.debug("Validating transaction for shard {}", shardRoot);
140
141         Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
142         final List<ListenableFuture<Boolean>> futures =
143                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
144         final SettableFuture<Boolean> ret = SettableFuture.create();
145
146         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
147             @Override
148             public void onSuccess(final List<Boolean> result) {
149                 ret.set(true);
150             }
151
152             @Override
153             public void onFailure(final Throwable throwable) {
154                 ret.setException(throwable);
155             }
156         });
157
158         return ret;
159     }
160
161     @Override
162     public ListenableFuture<Void> prepare() {
163         LOG.debug("Preparing transaction for shard {}", shardRoot);
164
165         Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
166         final List<ListenableFuture<Void>> futures =
167                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
168         final SettableFuture<Void> ret = SettableFuture.create();
169
170         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
171             @Override
172             public void onSuccess(final List<Void> result) {
173                 ret.set(null);
174             }
175
176             @Override
177             public void onFailure(final Throwable throwable) {
178                 ret.setException(throwable);
179             }
180         });
181
182         return ret;
183     }
184
185     @Override
186     public ListenableFuture<Void> commit() {
187         LOG.debug("Committing transaction for shard {}", shardRoot);
188
189         Preconditions.checkState(!cohorts.isEmpty(), "Transaction not readied yet");
190         final List<ListenableFuture<Void>> futures =
191                 cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
192         final SettableFuture<Void> ret = SettableFuture.create();
193
194         Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
195             @Override
196             public void onSuccess(final List<Void> result) {
197                 ret.set(null);
198             }
199
200             @Override
201             public void onFailure(final Throwable throwable) {
202                 ret.setException(throwable);
203             }
204         });
205
206         return ret;
207     }
208 }