2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import static org.hamcrest.CoreMatchers.hasItems;
11 import static org.hamcrest.MatcherAssert.assertThat;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertSame;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.ArgumentMatchers.isA;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
21 import akka.actor.ActorRef;
22 import akka.actor.ActorSystem;
23 import akka.testkit.TestProbe;
24 import akka.testkit.javadsl.TestKit;
25 import com.google.common.collect.Iterables;
26 import java.util.OptionalLong;
27 import java.util.function.Consumer;
28 import org.junit.After;
29 import org.junit.Before;
30 import org.junit.Test;
31 import org.mockito.MockitoAnnotations;
32 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
33 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
34 import org.opendaylight.controller.cluster.access.commands.TransactionFailure;
35 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
36 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
37 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
38 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
39 import org.opendaylight.controller.cluster.access.concepts.MemberName;
40 import org.opendaylight.controller.cluster.access.concepts.Request;
41 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
42 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
43 import org.opendaylight.controller.cluster.access.concepts.Response;
44 import org.opendaylight.controller.cluster.access.concepts.ResponseEnvelope;
45 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
46 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
47 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
49 public abstract class AbstractClientConnectionTest<T extends AbstractClientConnection<U>, U extends BackendInfo> {
51 protected static final MemberName MEMBER_NAME = MemberName.forName("member-1");
52 protected static final FrontendType FRONTEND_TYPE =
53 FrontendType.forName(ClientActorContextTest.class.getSimpleName());
54 protected static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
55 protected static final ClientIdentifier CLIENT_ID = ClientIdentifier.create(FRONTEND_ID, 0);
56 protected static final String PERSISTENCE_ID = "per-1";
58 protected T connection;
59 protected ClientActorContext context;
60 protected ActorSystem system;
61 protected TestProbe backendProbe;
62 protected TestProbe contextProbe;
63 protected TestProbe replyToProbe;
67 MockitoAnnotations.initMocks(this);
68 system = ActorSystem.apply();
69 backendProbe = new TestProbe(system);
70 contextProbe = new TestProbe(system);
71 context = new ClientActorContext(contextProbe.ref(), PERSISTENCE_ID, system,
72 CLIENT_ID, AccessClientUtil.newMockClientActorConfig());
73 replyToProbe = new TestProbe(system);
74 connection = createConnection();
77 protected abstract T createConnection();
80 public void testLocalActor() {
81 assertEquals(contextProbe.ref(), connection.localActor());
85 public abstract void testReconnectConnection();
88 public void testPoison() {
89 final Consumer<Response<?, ?>> callback = mock(Consumer.class);
90 final Request<?, ?> request = createRequest(replyToProbe.ref());
91 final ConnectionEntry entry = new ConnectionEntry(request, callback, 0L);
92 connection.enqueueEntry(entry, 0L);
93 connection.poison(new RuntimeRequestException("fail", new RuntimeException("fail")));
94 verify(callback, timeout(1000)).accept(isA(TransactionFailure.class));
98 public void testSendRequestReceiveResponse() {
99 final Consumer<Response<?, ?>> callback = mock(Consumer.class);
100 final Request<?, ?> request = createRequest(replyToProbe.ref());
101 connection.sendRequest(request, callback);
102 final RequestEnvelope requestEnvelope = backendProbe.expectMsgClass(RequestEnvelope.class);
103 assertEquals(request, requestEnvelope.getMessage());
104 final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(CLIENT_ID, 0L);
105 final RequestSuccess<?, ?> message = new TransactionAbortSuccess(new TransactionIdentifier(historyId, 0L), 0L);
106 final ResponseEnvelope<?> envelope = new SuccessEnvelope(message, 0L, 0L, 0L);
107 connection.receiveResponse(envelope);
108 verify(callback, timeout(1000)).accept(isA(TransactionAbortSuccess.class));
112 public void testRun() {
113 final ClientActorBehavior<U> behavior = mock(ClientActorBehavior.class);
114 assertSame(behavior, connection.runTimer(behavior));
118 public void testCheckTimeoutEmptyQueue() {
119 assertEquals(OptionalLong.empty(), connection.checkTimeout(context.ticker().read()));
123 public void testCheckTimeout() {
124 final Consumer<Response<?, ?>> callback = mock(Consumer.class);
125 connection.sendRequest(createRequest(replyToProbe.ref()), callback);
126 final long now = context.ticker().read();
127 final OptionalLong timeout = connection.checkTimeout(now);
128 assertTrue(timeout.isPresent());
132 public void testReplay() {
133 final Consumer<Response<?, ?>> callback = mock(Consumer.class);
134 final Request<?, ?> request1 = createRequest(replyToProbe.ref());
135 final Request<?, ?> request2 = createRequest(replyToProbe.ref());
136 connection.sendRequest(request1, callback);
137 connection.sendRequest(request2, callback);
138 final Iterable<ConnectionEntry> entries = connection.startReplay();
139 assertThat(entries, hasItems(entryWithRequest(request1), entryWithRequest(request2)));
140 assertEquals(2, Iterables.size(entries));
141 Iterables.removeIf(entries, e -> true);
142 final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
143 connection.finishReplay(forwarder);
147 public void tearDown() {
148 TestKit.shutdownActorSystem(system);
151 protected Request<?, ?> createRequest(final ActorRef replyTo) {
152 final TransactionIdentifier identifier =
153 new TransactionIdentifier(new LocalHistoryIdentifier(CLIENT_ID, 0L), 0L);
154 return new AbortLocalTransactionRequest(identifier, replyTo);