2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static org.hamcrest.CoreMatchers.both;
11 import static org.hamcrest.CoreMatchers.hasItem;
12 import static org.hamcrest.core.Is.isA;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.verify;
15 import static org.mockito.Mockito.when;
17 import akka.actor.ActorSystem;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestProbe;
20 import com.google.common.primitives.UnsignedLong;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Optional;
24 import java.util.function.BiFunction;
25 import java.util.function.Consumer;
26 import org.hamcrest.BaseMatcher;
27 import org.hamcrest.Description;
28 import org.junit.After;
29 import org.junit.Assert;
30 import org.junit.Before;
31 import org.junit.Test;
32 import org.mockito.Mock;
33 import org.mockito.MockitoAnnotations;
34 import org.opendaylight.controller.cluster.access.ABIVersion;
35 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
36 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
37 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
38 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
39 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
40 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
41 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
42 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
43 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
44 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
45 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
46 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
47 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
48 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
49 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
50 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
51 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
52 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
53 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
54 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
55 import org.opendaylight.controller.cluster.access.concepts.Response;
56 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
57 import org.opendaylight.yangtools.yang.common.QName;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
59 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
60 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
61 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
62 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
64 public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransaction> {
65 protected static final TransactionIdentifier TRANSACTION_ID = TestUtils.TRANSACTION_ID;
66 private static final ClientIdentifier CLIENT_ID = TestUtils.CLIENT_ID;
67 private static final LocalHistoryIdentifier HISTORY_ID = TestUtils.HISTORY_ID;
69 protected static final YangInstanceIdentifier PATH_1 = YangInstanceIdentifier.builder()
70 .node(QName.create("ns-1", "node-1"))
72 protected static final YangInstanceIdentifier PATH_2 = YangInstanceIdentifier.builder()
73 .node(QName.create("ns-1", "node-2"))
75 protected static final YangInstanceIdentifier PATH_3 = YangInstanceIdentifier.builder()
76 .node(QName.create("ns-1", "node-3"))
78 protected static final ContainerNode DATA_1 = Builders.containerBuilder()
79 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_1.getLastPathArgument().getNodeType()))
81 protected static final ContainerNode DATA_2 = Builders.containerBuilder()
82 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_2.getLastPathArgument().getNodeType()))
84 protected static final String PERSISTENCE_ID = "per-1";
87 private DataTreeSnapshot snapshot;
89 private AbstractClientHistory history;
90 private ActorSystem system;
91 private TestProbe backendProbe;
92 private TestProbe clientContextProbe;
93 private TransactionTester<T> tester;
94 protected T transaction;
97 public void setUp() throws Exception {
98 MockitoAnnotations.initMocks(this);
99 system = ActorSystem.apply();
100 clientContextProbe = new TestProbe(system, "clientContext");
101 backendProbe = new TestProbe(system, "backend");
102 final ClientActorContext context =
103 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
104 final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
105 "default", UnsignedLong.ZERO, Optional.empty(), 3);
106 final AbstractClientConnection<ShardBackendInfo> connection =
107 AccessClientUtil.createConnectedConnection(context, 0L, backend);
108 final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
109 transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
110 tester = new TransactionTester<>(transaction, connection, backendProbe);
113 protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot);
116 public void tearDown() throws Exception {
117 JavaTestKit.shutdownActorSystem(system);
121 public abstract void testExists() throws Exception;
124 public abstract void testRead() throws Exception;
127 public abstract void testWrite() throws Exception;
130 public abstract void testMerge() throws Exception;
133 public abstract void testDelete() throws Exception;
136 public abstract void testDirectCommit() throws Exception;
139 public abstract void testCanCommit() throws Exception;
142 public abstract void testPreCommit() throws Exception;
145 public abstract void testDoCommit() throws Exception;
148 public abstract void testForwardToRemoteAbort() throws Exception;
151 public abstract void testForwardToRemoteCommit() throws Exception;
154 public void testAbortVotingFuture() throws Exception {
155 testRequestResponse(f -> transaction.abort(f), TransactionAbortRequest.class, TransactionAbortSuccess::new);
159 public void testForwardToRemotePurge() throws Exception {
160 final TestProbe probe = new TestProbe(system);
161 final TransactionPurgeRequest request = new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
162 testForwardToRemote(request, TransactionPurgeRequest.class);
166 public void testReplayMessages() throws Exception {
167 final TestProbe probe = new TestProbe(system);
168 final List<ConnectionEntry> entries = new ArrayList<>();
169 final Consumer<Response<?, ?>> callback = createCallbackMock();
170 final ReadTransactionRequest request1 =
171 new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_2, true);
172 final ExistsTransactionRequest request2 =
173 new ExistsTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_3, true);
174 entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
175 entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
176 final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
177 final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
178 transaction.recordSuccessfulRequest(successful1);
179 final ReadTransactionRequest successful2 =
180 new ReadTransactionRequest(TRANSACTION_ID, 0L, probe.ref(), PATH_1, true);
181 transaction.recordSuccessfulRequest(successful2);
182 transaction.startReconnect();
183 transaction.replayMessages(successor.getTransaction(), entries);
184 Assert.assertEquals(successful1, successor.expectTransactionRequest(AbortLocalTransactionRequest.class));
185 Assert.assertEquals(successful2, successor.expectTransactionRequest(ReadTransactionRequest.class));
186 Assert.assertEquals(request1, successor.expectTransactionRequest(ReadTransactionRequest.class));
187 Assert.assertEquals(request2, successor.expectTransactionRequest(ExistsTransactionRequest.class));
190 protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
191 final List<TransactionModification> modifications = modifyRequest.getModifications();
192 Assert.assertEquals(3, modifications.size());
193 Assert.assertThat(modifications, hasItem(both(isA(TransactionWrite.class)).and((hasPath(PATH_1)))));
194 Assert.assertThat(modifications, hasItem(both(isA(TransactionMerge.class)).and((hasPath(PATH_2)))));
195 Assert.assertThat(modifications, hasItem(both(isA(TransactionDelete.class)).and((hasPath(PATH_3)))));
198 protected void testRequestResponse(final Consumer<VotingFuture> consumer,
199 final Class<? extends TransactionRequest> expectedRequest,
200 final BiFunction<TransactionIdentifier, Long, TransactionSuccess> replySupplier)
202 final TransactionTester<T> tester = getTester();
203 final VotingFuture future = mock(VotingFuture.class);
205 consumer.accept(future);
206 final TransactionRequest req = tester.expectTransactionRequest(expectedRequest);
207 tester.replySuccess(replySupplier.apply(TRANSACTION_ID, req.getSequence()));
208 verify(future).voteYes();
211 protected <T extends TransactionRequest> T testHandleForwardedRemoteRequest(final T request) throws Exception {
212 transaction.handleForwardedRemoteRequest(request, createCallbackMock());
213 final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
214 final T received = (T) envelope.getMessage();
215 Assert.assertTrue(received.getClass().equals(request.getClass()));
216 Assert.assertEquals(TRANSACTION_ID, received.getTarget());
217 Assert.assertEquals(clientContextProbe.ref(), received.getReplyTo());
221 protected <T extends TransactionRequest> T testForwardToRemote(final TransactionRequest toForward,
222 final Class<T> expectedMessageClass) {
223 final Consumer<Response<?, ?>> callback = createCallbackMock();
224 final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
225 final RemoteProxyTransaction successor = transactionTester.getTransaction();
226 transaction.forwardToRemote(successor, toForward, callback);
227 return transactionTester.expectTransactionRequest(expectedMessageClass);
230 protected TransactionTester<T> getTester() {
234 @SuppressWarnings("unchecked")
235 protected <T> Consumer<T> createCallbackMock() {
236 return (Consumer<T>) mock(Consumer.class);
239 protected static BaseMatcher<TransactionModification> hasPath(final YangInstanceIdentifier path) {
240 return new BaseMatcher<TransactionModification>() {
243 public boolean matches(final Object item) {
244 return path.equals(((TransactionModification) item).getPath());
248 public void describeTo(final Description description) {
249 description.appendValue(path);
253 public void describeMismatch(final Object item, final Description description) {
254 final TransactionModification modification = (TransactionModification) item;
255 description.appendText("was ").appendValue(modification.getPath());
260 protected TestProbe createProbe() {
261 return new TestProbe(system);
264 protected TransactionTester<LocalReadWriteProxyTransaction> createLocalProxy() {
265 final TestProbe backendProbe = new TestProbe(system, "backend2");
266 final TestProbe clientContextProbe = new TestProbe(system, "clientContext2");
267 final ClientActorContext context =
268 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
269 final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
270 "default", UnsignedLong.ZERO, Optional.empty(), 3);
271 final AbstractClientConnection<ShardBackendInfo> connection =
272 AccessClientUtil.createConnectedConnection(context, 0L, backend);
273 final AbstractClientHistory history = mock(AbstractClientHistory.class);
274 final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
275 final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class);
276 when(snapshot.newModification()).thenReturn(mock(CursorAwareDataTreeModification.class));
277 final LocalReadWriteProxyTransaction tx =
278 new LocalReadWriteProxyTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
279 return new TransactionTester<>(tx, connection, backendProbe);
282 protected TransactionTester<RemoteProxyTransaction> createRemoteProxyTransactionTester() {
283 final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext");
284 final TestProbe backendProbe = new TestProbe(system, "remoteBackend");
285 final AbstractClientHistory history = mock(AbstractClientHistory.class);
286 final ClientActorContext context =
287 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
288 final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
289 "default", UnsignedLong.ZERO, Optional.empty(), 5);
290 final AbstractClientConnection<ShardBackendInfo> connection =
291 AccessClientUtil.createConnectedConnection(context, 0L, backend);
292 final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
293 final RemoteProxyTransaction transaction =
294 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
295 return new TransactionTester<>(transaction, connection, backendProbe);