2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static org.hamcrest.CoreMatchers.both;
11 import static org.hamcrest.CoreMatchers.hasItem;
12 import static org.hamcrest.core.Is.isA;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.verify;
15 import static org.mockito.Mockito.when;
17 import akka.actor.ActorSystem;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestProbe;
20 import com.google.common.base.Ticker;
21 import com.google.common.primitives.UnsignedLong;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.Optional;
25 import java.util.function.BiFunction;
26 import java.util.function.Consumer;
27 import org.hamcrest.BaseMatcher;
28 import org.hamcrest.Description;
29 import org.junit.After;
30 import org.junit.Assert;
31 import org.junit.Before;
32 import org.junit.Test;
33 import org.mockito.Mock;
34 import org.mockito.MockitoAnnotations;
35 import org.opendaylight.controller.cluster.access.ABIVersion;
36 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
37 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
38 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
39 import org.opendaylight.controller.cluster.access.client.ConnectionEntry;
40 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
41 import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
42 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
43 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
44 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
45 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
46 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
47 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
48 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
49 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
50 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
51 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
52 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
53 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
54 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
55 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
56 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
57 import org.opendaylight.controller.cluster.access.concepts.Response;
58 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
59 import org.opendaylight.yangtools.yang.common.QName;
60 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
61 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
62 import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification;
63 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
64 import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
66 public abstract class AbstractProxyTransactionTest<T extends AbstractProxyTransaction> {
67 protected static final TransactionIdentifier TRANSACTION_ID = TestUtils.TRANSACTION_ID;
68 private static final ClientIdentifier CLIENT_ID = TestUtils.CLIENT_ID;
69 private static final LocalHistoryIdentifier HISTORY_ID = TestUtils.HISTORY_ID;
71 protected static final YangInstanceIdentifier PATH_1 = YangInstanceIdentifier.builder()
72 .node(QName.create("ns-1", "node-1"))
74 protected static final YangInstanceIdentifier PATH_2 = YangInstanceIdentifier.builder()
75 .node(QName.create("ns-1", "node-2"))
77 protected static final YangInstanceIdentifier PATH_3 = YangInstanceIdentifier.builder()
78 .node(QName.create("ns-1", "node-3"))
80 protected static final ContainerNode DATA_1 = Builders.containerBuilder()
81 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_1.getLastPathArgument().getNodeType()))
83 protected static final ContainerNode DATA_2 = Builders.containerBuilder()
84 .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(PATH_2.getLastPathArgument().getNodeType()))
86 protected static final String PERSISTENCE_ID = "per-1";
89 private DataTreeSnapshot snapshot;
91 private AbstractClientHistory history;
92 private ActorSystem system;
93 private TestProbe backendProbe;
94 private TestProbe clientContextProbe;
95 private TransactionTester<T> tester;
96 protected T transaction;
99 public void setUp() throws Exception {
100 MockitoAnnotations.initMocks(this);
101 system = ActorSystem.apply();
102 clientContextProbe = new TestProbe(system, "clientContext");
103 backendProbe = new TestProbe(system, "backend");
104 final ClientActorContext context =
105 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
106 final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
107 "default", UnsignedLong.ZERO, Optional.empty(), 3);
108 final AbstractClientConnection<ShardBackendInfo> connection =
109 AccessClientUtil.createConnectedConnection(context, 0L, backend);
110 final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
111 transaction = createTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
112 tester = new TransactionTester<>(transaction, connection, backendProbe);
115 protected abstract T createTransaction(ProxyHistory parent, TransactionIdentifier id, DataTreeSnapshot snapshot);
118 public void tearDown() throws Exception {
119 JavaTestKit.shutdownActorSystem(system);
123 public abstract void testExists() throws Exception;
126 public abstract void testRead() throws Exception;
129 public abstract void testWrite() throws Exception;
132 public abstract void testMerge() throws Exception;
135 public abstract void testDelete() throws Exception;
138 public abstract void testDirectCommit() throws Exception;
141 public abstract void testCanCommit() throws Exception;
144 public abstract void testPreCommit() throws Exception;
147 public abstract void testDoCommit() throws Exception;
150 public abstract void testForwardToRemoteAbort() throws Exception;
153 public abstract void testForwardToRemoteCommit() throws Exception;
156 public void testAbortVotingFuture() throws Exception {
157 testRequestResponse(f -> transaction.abort(f), TransactionAbortRequest.class, TransactionAbortSuccess::new);
161 public void testForwardToRemotePurge() throws Exception {
162 final TestProbe probe = new TestProbe(system);
163 final TransactionPurgeRequest request = new TransactionPurgeRequest(TRANSACTION_ID, 0L, probe.ref());
164 testForwardToRemote(request, TransactionPurgeRequest.class);
168 public void testReplayMessages() throws Exception {
169 final TestProbe probe = new TestProbe(system);
170 final List<ConnectionEntry> entries = new ArrayList<>();
171 final Consumer<Response<?, ?>> callback = createCallbackMock();
172 final ReadTransactionRequest request1 =
173 new ReadTransactionRequest(TRANSACTION_ID, 2L, probe.ref(), PATH_2, true);
174 final ExistsTransactionRequest request2 =
175 new ExistsTransactionRequest(TRANSACTION_ID, 3L, probe.ref(), PATH_3, true);
176 entries.add(AccessClientUtil.createConnectionEntry(request1, callback, 0L));
177 entries.add(AccessClientUtil.createConnectionEntry(request2, callback, 0L));
178 final TransactionTester<RemoteProxyTransaction> successor = createRemoteProxyTransactionTester();
179 final AbortLocalTransactionRequest successful1 = new AbortLocalTransactionRequest(TRANSACTION_ID, probe.ref());
180 transaction.recordSuccessfulRequest(successful1);
181 final ReadTransactionRequest successful2 =
182 new ReadTransactionRequest(TRANSACTION_ID, 1L, probe.ref(), PATH_1, true);
183 transaction.recordSuccessfulRequest(successful2);
184 transaction.startReconnect();
185 transaction.replayMessages(successor.getTransaction(), entries);
187 final ModifyTransactionRequest transformed = successor.expectTransactionRequest(ModifyTransactionRequest.class);
188 Assert.assertNotNull(transformed);
189 Assert.assertEquals(successful1.getSequence(), transformed.getSequence());
190 Assert.assertTrue(transformed.getPersistenceProtocol().isPresent());
191 Assert.assertEquals(PersistenceProtocol.ABORT, transformed.getPersistenceProtocol().get());
193 ReadTransactionRequest tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
194 Assert.assertNotNull(tmpRead);
195 Assert.assertEquals(successful2.getTarget(), tmpRead.getTarget());
196 Assert.assertEquals(successful2.getSequence(), tmpRead.getSequence());
197 Assert.assertEquals(successful2.getPath(), tmpRead.getPath());
198 Assert.assertEquals(successor.localActor(), tmpRead.getReplyTo());
200 tmpRead = successor.expectTransactionRequest(ReadTransactionRequest.class);
201 Assert.assertNotNull(tmpRead);
202 Assert.assertEquals(request1.getTarget(), tmpRead.getTarget());
203 Assert.assertEquals(request1.getSequence(), tmpRead.getSequence());
204 Assert.assertEquals(request1.getPath(), tmpRead.getPath());
205 Assert.assertEquals(successor.localActor(), tmpRead.getReplyTo());
207 final ExistsTransactionRequest tmpExist = successor.expectTransactionRequest(ExistsTransactionRequest.class);
208 Assert.assertNotNull(tmpExist);
209 Assert.assertEquals(request2.getTarget(), tmpExist.getTarget());
210 Assert.assertEquals(request2.getSequence(), tmpExist.getSequence());
211 Assert.assertEquals(request2.getPath(), tmpExist.getPath());
212 Assert.assertEquals(successor.localActor(), tmpExist.getReplyTo());
215 protected void checkModifications(final ModifyTransactionRequest modifyRequest) {
216 final List<TransactionModification> modifications = modifyRequest.getModifications();
217 Assert.assertEquals(3, modifications.size());
218 Assert.assertThat(modifications, hasItem(both(isA(TransactionWrite.class)).and(hasPath(PATH_1))));
219 Assert.assertThat(modifications, hasItem(both(isA(TransactionMerge.class)).and(hasPath(PATH_2))));
220 Assert.assertThat(modifications, hasItem(both(isA(TransactionDelete.class)).and(hasPath(PATH_3))));
223 protected <R extends TransactionRequest<R>> void testRequestResponse(final Consumer<VotingFuture<Void>> consumer,
224 final Class<R> expectedRequest,
225 final BiFunction<TransactionIdentifier, Long, TransactionSuccess<?>> replySupplier) throws Exception {
226 final TransactionTester<T> tester = getTester();
227 final VotingFuture<Void> future = mock(VotingFuture.class);
229 consumer.accept(future);
230 final TransactionRequest<?> req = tester.expectTransactionRequest(expectedRequest);
231 tester.replySuccess(replySupplier.apply(TRANSACTION_ID, req.getSequence()));
232 verify(future).voteYes();
235 protected <R extends TransactionRequest<R>> R testHandleForwardedRemoteRequest(final R request) throws Exception {
236 transaction.handleReplayedRemoteRequest(request, createCallbackMock(), Ticker.systemTicker().read());
237 final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
238 final R received = (R) envelope.getMessage();
239 Assert.assertTrue(received.getClass().equals(request.getClass()));
240 Assert.assertEquals(TRANSACTION_ID, received.getTarget());
241 Assert.assertEquals(clientContextProbe.ref(), received.getReplyTo());
245 protected <R extends TransactionRequest<R>> R testForwardToRemote(final TransactionRequest<?> toForward,
246 final Class<R> expectedMessageClass) {
247 final Consumer<Response<?, ?>> callback = createCallbackMock();
248 final TransactionTester<RemoteProxyTransaction> transactionTester = createRemoteProxyTransactionTester();
249 final RemoteProxyTransaction successor = transactionTester.getTransaction();
250 transaction.forwardToRemote(successor, toForward, callback);
251 return transactionTester.expectTransactionRequest(expectedMessageClass);
254 protected TransactionTester<T> getTester() {
258 @SuppressWarnings("unchecked")
259 protected static <T> Consumer<T> createCallbackMock() {
260 return mock(Consumer.class);
263 protected static BaseMatcher<TransactionModification> hasPath(final YangInstanceIdentifier path) {
264 return new BaseMatcher<TransactionModification>() {
267 public boolean matches(final Object item) {
268 return path.equals(((TransactionModification) item).getPath());
272 public void describeTo(final Description description) {
273 description.appendValue(path);
277 public void describeMismatch(final Object item, final Description description) {
278 final TransactionModification modification = (TransactionModification) item;
279 description.appendText("was ").appendValue(modification.getPath());
284 protected TestProbe createProbe() {
285 return new TestProbe(system);
288 protected TransactionTester<LocalReadWriteProxyTransaction> createLocalProxy() {
289 final TestProbe backendProbe = new TestProbe(system, "backend2");
290 final TestProbe clientContextProbe = new TestProbe(system, "clientContext2");
291 final ClientActorContext context =
292 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
293 final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
294 "default", UnsignedLong.ZERO, Optional.empty(), 3);
295 final AbstractClientConnection<ShardBackendInfo> connection =
296 AccessClientUtil.createConnectedConnection(context, 0L, backend);
297 final AbstractClientHistory history = mock(AbstractClientHistory.class);
298 final ProxyHistory parent = ProxyHistory.createClient(history, connection, HISTORY_ID);
299 final DataTreeSnapshot snapshot = mock(DataTreeSnapshot.class);
300 when(snapshot.newModification()).thenReturn(mock(CursorAwareDataTreeModification.class));
301 final LocalReadWriteProxyTransaction tx =
302 new LocalReadWriteProxyTransaction(parent, TestUtils.TRANSACTION_ID, snapshot);
303 return new TransactionTester<>(tx, connection, backendProbe);
306 protected TransactionTester<RemoteProxyTransaction> createRemoteProxyTransactionTester() {
307 final TestProbe clientContextProbe = new TestProbe(system, "remoteClientContext");
308 final TestProbe backendProbe = new TestProbe(system, "remoteBackend");
309 final AbstractClientHistory history = mock(AbstractClientHistory.class);
310 final ClientActorContext context =
311 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
312 final ShardBackendInfo backend = new ShardBackendInfo(backendProbe.ref(), 0L, ABIVersion.BORON,
313 "default", UnsignedLong.ZERO, Optional.empty(), 5);
314 final AbstractClientConnection<ShardBackendInfo> connection =
315 AccessClientUtil.createConnectedConnection(context, 0L, backend);
316 final ProxyHistory proxyHistory = ProxyHistory.createClient(history, connection, HISTORY_ID);
317 final RemoteProxyTransaction transaction =
318 new RemoteProxyTransaction(proxyHistory, TRANSACTION_ID, false, false);
319 return new TransactionTester<>(transaction, connection, backendProbe);