2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.databroker.actors.dds;
10 import static org.mockito.Matchers.any;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.verify;
13 import static org.mockito.Mockito.when;
15 import akka.actor.ActorRef;
16 import akka.actor.ActorSelection;
17 import akka.actor.ActorSystem;
18 import akka.testkit.JavaTestKit;
19 import akka.testkit.TestProbe;
20 import java.util.Collection;
21 import java.util.Collections;
22 import java.util.function.Function;
23 import org.junit.After;
24 import org.junit.Assert;
25 import org.junit.Before;
26 import org.junit.Test;
27 import org.mockito.Mock;
28 import org.mockito.MockitoAnnotations;
29 import org.opendaylight.controller.cluster.access.client.AbstractClientConnection;
30 import org.opendaylight.controller.cluster.access.client.AccessClientUtil;
31 import org.opendaylight.controller.cluster.access.client.ClientActorContext;
32 import org.opendaylight.controller.cluster.access.client.InternalCommand;
33 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
34 import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest;
35 import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess;
36 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
37 import org.opendaylight.controller.cluster.access.concepts.Envelope;
38 import org.opendaylight.controller.cluster.access.concepts.FailureEnvelope;
39 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
40 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
41 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.access.concepts.Request;
44 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
45 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
46 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
47 import org.opendaylight.controller.cluster.access.concepts.Response;
48 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
49 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
50 import org.opendaylight.controller.cluster.datastore.messages.PrimaryShardInfo;
51 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
54 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
55 import scala.concurrent.Promise;
57 public abstract class AbstractClientHandleTest<T extends AbstractClientHandle<AbstractProxyTransaction>> {
59 private static final MemberName MEMBER_NAME = MemberName.forName("member-1");
60 private static final FrontendType FRONTEND_TYPE = FrontendType.forName("type-1");
61 private static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
62 private static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
63 private static final LocalHistoryIdentifier HISTORY_ID = new LocalHistoryIdentifier(CLIENT_ID, 0L);
64 private static final String PERSISTENCE_ID = "per-1";
65 private static final YangInstanceIdentifier PATH = YangInstanceIdentifier.EMPTY;
66 protected static final TransactionIdentifier TRANSACTION_ID = new TransactionIdentifier(HISTORY_ID, 0L);
69 private DataTree dataTree;
71 private DataTreeSnapshot dataTreeSnapshot;
72 private ActorSystem system;
73 private TestProbe backendProbe;
74 private AbstractClientHistory parent;
75 private AbstractDataStoreClientBehavior client;
79 public void setUp() throws Exception {
80 MockitoAnnotations.initMocks(this);
81 system = ActorSystem.apply();
82 final TestProbe contextProbe = new TestProbe(system, "context");
83 final TestProbe clientContextProbe = new TestProbe(system, "client-context");
84 backendProbe = new TestProbe(system, "backend");
85 //create handle dependencies
86 final ActorContext actorContext = createActorContextMock(system, contextProbe.ref());
87 final ClientActorContext clientContext =
88 AccessClientUtil.createClientActorContext(system, clientContextProbe.ref(), CLIENT_ID, PERSISTENCE_ID);
89 client = new SimpleDataStoreClientBehavior(clientContext, actorContext, "shard");
90 client.createLocalHistory();
91 parent = new SingleClientHistory(client, HISTORY_ID);
93 client.getConnection(0L);
94 contextProbe.expectMsgClass(ConnectClientRequest.class);
95 final long sequence = 0L;
96 contextProbe.reply(new ConnectClientSuccess(CLIENT_ID, sequence, backendProbe.ref(),
97 Collections.emptyList(), dataTree, 3));
98 final InternalCommand command = clientContextProbe.expectMsgClass(InternalCommand.class);
99 command.execute(client);
101 when(dataTree.takeSnapshot()).thenReturn(dataTreeSnapshot);
103 handle = createHandle(parent);
106 protected abstract T createHandle(AbstractClientHistory parent);
109 * Do a operation with handle.
110 * Used for testing, whether closed handle throws exception when the operation is performed.
112 * @param handle handle
114 protected abstract void doHandleOperation(T handle);
117 public void tearDown() throws Exception {
118 JavaTestKit.shutdownActorSystem(system);
122 public void testGetIdentifier() throws Exception {
123 Assert.assertEquals(TRANSACTION_ID, handle.getIdentifier());
127 public void testAbort() throws Exception {
128 doHandleOperation(handle);
130 final Envelope envelope = backendProbe.expectMsgClass(Envelope.class);
131 final AbortLocalTransactionRequest request = (AbortLocalTransactionRequest) envelope.getMessage();
132 Assert.assertEquals(TRANSACTION_ID, request.getTarget());
137 public void testLocalAbort() throws Exception {
138 doHandleOperation(handle);
139 handle.localAbort(new RuntimeException("fail"));
140 final Envelope envelope = backendProbe.expectMsgClass(Envelope.class);
141 final AbortLocalTransactionRequest request = (AbortLocalTransactionRequest) envelope.getMessage();
142 Assert.assertEquals(TRANSACTION_ID, request.getTarget());
147 public void testEnsureClosed() throws Exception {
148 doHandleOperation(handle);
149 final Collection<AbstractProxyTransaction> transactions = handle.ensureClosed();
150 Assert.assertNotNull(transactions);
151 Assert.assertEquals(1, transactions.size());
155 public void testEnsureProxy() throws Exception {
156 final Function<Long, AbstractProxyTransaction> function = mock(Function.class);
157 final AbstractProxyTransaction expected = mock(AbstractProxyTransaction.class);
158 when(function.apply(0L)).thenReturn(expected);
159 final AbstractProxyTransaction proxy = handle.ensureProxy(PATH, function);
160 verify(function).apply(0L);
161 Assert.assertEquals(expected, proxy);
165 public void testParent() throws Exception {
166 Assert.assertEquals(parent, handle.parent());
169 protected void checkClosed() throws Exception {
170 TestUtils.assertOperationThrowsException(() -> doHandleOperation(handle), IllegalStateException.class);
174 * Checks, whether backend actor has received request of expected class wrapped in RequestEnvelope.
175 * Then given response wrapped in ResponseEnvelope is sent.
177 * @param expectedRequestClass expected request class
178 * @param response response
179 * @param <R> expected request type
180 * @return request message
182 protected <R extends Request> R backendRespondToRequest(final Class<R> expectedRequestClass,
183 final Response response) {
184 final RequestEnvelope envelope = backendProbe.expectMsgClass(RequestEnvelope.class);
185 Assert.assertEquals(expectedRequestClass, envelope.getMessage().getClass());
186 final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(0L);
187 final long sessionId = envelope.getSessionId();
188 final long txSequence = envelope.getTxSequence();
189 final long executionTime = 0L;
190 if (response instanceof RequestSuccess) {
191 final RequestSuccess<?, ?> success = (RequestSuccess<?, ?>) response;
192 final SuccessEnvelope responseEnvelope = new SuccessEnvelope(success, sessionId, txSequence, executionTime);
193 AccessClientUtil.completeRequest(connection, responseEnvelope);
194 } else if (response instanceof RequestFailure) {
195 final RequestFailure<?, ?> fail = (RequestFailure<?, ?>) response;
196 final FailureEnvelope responseEnvelope = new FailureEnvelope(fail, sessionId, txSequence, executionTime);
197 AccessClientUtil.completeRequest(connection, responseEnvelope);
199 return (R) envelope.getMessage();
202 protected T getHandle() {
206 protected DataTreeSnapshot getDataTreeSnapshot() {
207 return dataTreeSnapshot;
210 private static ActorContext createActorContextMock(final ActorSystem system, final ActorRef actor) {
211 final ActorContext mock = mock(ActorContext.class);
212 final Promise<PrimaryShardInfo> promise = new scala.concurrent.impl.Promise.DefaultPromise<>();
213 final ActorSelection selection = system.actorSelection(actor.path());
214 final PrimaryShardInfo shardInfo = new PrimaryShardInfo(selection, (short) 0);
215 promise.success(shardInfo);
216 when(mock.findPrimaryShardAsync(any())).thenReturn(promise.future());