2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import static org.hamcrest.CoreMatchers.everyItem;
11 import static org.hamcrest.MatcherAssert.assertThat;
12 import static org.junit.Assert.assertEquals;
13 import static org.junit.Assert.assertFalse;
14 import static org.junit.Assert.assertTrue;
15 import static org.mockito.Mockito.mock;
16 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
18 import akka.actor.ActorSystem;
19 import akka.testkit.TestProbe;
20 import akka.testkit.javadsl.TestKit;
21 import com.google.common.base.Ticker;
22 import java.util.Collection;
23 import java.util.Optional;
24 import java.util.function.Consumer;
25 import org.junit.After;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
29 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
30 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
31 import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
32 import org.opendaylight.controller.cluster.access.concepts.FrontendType;
33 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
34 import org.opendaylight.controller.cluster.access.concepts.MemberName;
35 import org.opendaylight.controller.cluster.access.concepts.Request;
36 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
37 import org.opendaylight.controller.cluster.access.concepts.Response;
38 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
39 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
41 public abstract class AbstractTransmitQueueTest<T extends TransmitQueue> {
43 private static final FrontendIdentifier FRONTEND =
44 FrontendIdentifier.create(MemberName.forName("test"), FrontendType.forName("type-1"));
45 private static final ClientIdentifier CLIENT = ClientIdentifier.create(FRONTEND, 0);
46 protected static final LocalHistoryIdentifier HISTORY = new LocalHistoryIdentifier(CLIENT, 0);
47 protected static final TransactionIdentifier TRANSACTION_IDENTIFIER = new TransactionIdentifier(HISTORY, 0);
49 protected ActorSystem system;
50 protected TestProbe probe;
52 protected abstract int getMaxInFlightMessages();
54 protected abstract T createQueue();
58 system = ActorSystem.apply();
59 probe = new TestProbe(system);
60 queue = createQueue();
64 public void tearDown() {
65 TestKit.shutdownActorSystem(system);
69 public abstract void testCanTransmitCount();
71 @Test(expected = UnsupportedOperationException.class)
72 public abstract void testTransmit();
75 public void testAsIterable() {
76 final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
77 final Consumer<Response<?, ?>> callback = createConsumerMock();
78 final long now = Ticker.systemTicker().read();
79 final int sentMessages = getMaxInFlightMessages() + 1;
80 for (int i = 0; i < sentMessages; i++) {
81 queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
83 final Collection<ConnectionEntry> entries = queue.drain();
84 assertEquals(sentMessages, entries.size());
85 assertThat(entries, everyItem(entryWithRequest(request)));
89 public void testTicksStalling() {
90 final long now = Ticker.systemTicker().read();
91 assertEquals(0, queue.ticksStalling(now));
95 public void testCompleteReponseNotMatchingRequest() {
96 final long requestSequence = 0L;
97 final long txSequence = 0L;
98 final long sessionId = 0L;
99 final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, requestSequence, probe.ref());
100 final Consumer<Response<?, ?>> callback = createConsumerMock();
101 final long now = Ticker.systemTicker().read();
102 queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
103 //different transaction id
104 final TransactionIdentifier anotherTxId = new TransactionIdentifier(HISTORY, 1L);
105 final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(anotherTxId, requestSequence);
106 final Optional<TransmittedConnectionEntry> completed1 =
107 queue.complete(new SuccessEnvelope(success1, sessionId, txSequence, 1L), now);
108 assertFalse(completed1.isPresent());
109 //different response sequence
110 final long differentResponseSequence = 1L;
111 final RequestSuccess<?, ?> success2 =
112 new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, differentResponseSequence);
113 final Optional<TransmittedConnectionEntry> completed2 =
114 queue.complete(new SuccessEnvelope(success2, sessionId, txSequence, 1L), now);
115 assertFalse(completed2.isPresent());
116 //different tx sequence
117 final long differentTxSequence = 1L;
118 final RequestSuccess<?, ?> success3 =
119 new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
120 final Optional<TransmittedConnectionEntry> completed3 =
121 queue.complete(new SuccessEnvelope(success3, sessionId, differentTxSequence, 1L), now);
122 assertFalse(completed3.isPresent());
123 //different session id
124 final long differentSessionId = 1L;
125 final RequestSuccess<?, ?> success4 =
126 new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, requestSequence);
127 final Optional<TransmittedConnectionEntry> completed4 =
128 queue.complete(new SuccessEnvelope(success4, differentSessionId, differentTxSequence, 1L), now);
129 assertFalse(completed4.isPresent());
133 public void testIsEmpty() {
134 assertTrue(queue.isEmpty());
135 final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
136 final Consumer<Response<?, ?>> callback = createConsumerMock();
137 final long now = Ticker.systemTicker().read();
138 queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
139 assertFalse(queue.isEmpty());
143 public void testPeek() {
144 final Request<?, ?> request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
145 final Request<?, ?> request2 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 1L, probe.ref());
146 final Consumer<Response<?, ?>> callback = createConsumerMock();
147 final long now = Ticker.systemTicker().read();
148 final ConnectionEntry entry1 = new ConnectionEntry(request1, callback, now);
149 final ConnectionEntry entry2 = new ConnectionEntry(request2, callback, now);
150 queue.enqueueOrForward(entry1, now);
151 queue.enqueueOrForward(entry2, now);
152 assertEquals(entry1.getRequest(), queue.peek().getRequest());
156 public void testPoison() {
157 final Request<?, ?> request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
158 final Consumer<Response<?, ?>> callback = createConsumerMock();
159 final long now = Ticker.systemTicker().read();
160 queue.enqueueOrForward(new ConnectionEntry(request, callback, now), now);
161 assertEquals(1, queue.poison().size());
164 @SuppressWarnings("unchecked")
165 protected static Consumer<Response<?, ?>> createConsumerMock() {
166 return mock(Consumer.class);