2 * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.access.client;
10 import static org.hamcrest.CoreMatchers.everyItem;
11 import static org.mockito.Matchers.eq;
12 import static org.mockito.Matchers.isA;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.verify;
15 import static org.opendaylight.controller.cluster.access.client.ConnectionEntryMatcher.entryWithRequest;
17 import com.google.common.base.Ticker;
18 import com.google.common.collect.Iterables;
19 import com.google.common.testing.FakeTicker;
20 import java.util.Optional;
21 import java.util.concurrent.TimeUnit;
22 import java.util.function.Consumer;
23 import org.junit.Assert;
24 import org.junit.Test;
25 import org.opendaylight.controller.cluster.access.ABIVersion;
26 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
27 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
28 import org.opendaylight.controller.cluster.access.concepts.Request;
29 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
30 import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
31 import org.opendaylight.controller.cluster.access.concepts.Response;
32 import org.opendaylight.controller.cluster.access.concepts.SuccessEnvelope;
33 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
35 public class TransmittingTransmitQueueTest extends AbstractTransmitQueueTest<TransmitQueue.Transmitting> {
37 private BackendInfo backendInfo;
40 protected int getMaxInFlightMessages() {
41 return backendInfo.getMaxMessages();
45 protected TransmitQueue.Transmitting createQueue() {
46 backendInfo = new BackendInfo(probe.ref(), 0L, ABIVersion.BORON, 3);
47 return new TransmitQueue.Transmitting(0, backendInfo);
51 public void testComplete() throws Exception {
52 final long sequence1 = 0L;
53 final long sequence2 = 1L;
54 final Request request1 = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, sequence1, probe.ref());
55 final TransactionIdentifier transactionIdentifier2 = new TransactionIdentifier(HISTORY, 1L);
56 final Request request2 = new TransactionPurgeRequest(transactionIdentifier2, sequence2, probe.ref());
57 final Consumer<Response<?, ?>> callback1 = createConsumerMock();
58 final Consumer<Response<?, ?>> callback2 = createConsumerMock();
59 final long now1 = Ticker.systemTicker().read();
60 final long now2 = Ticker.systemTicker().read();
62 queue.enqueue(new ConnectionEntry(request1, callback1, now1), now1);
63 queue.enqueue(new ConnectionEntry(request2, callback2, now2), now2);
64 final RequestSuccess<?, ?> success1 = new TransactionPurgeResponse(TRANSACTION_IDENTIFIER, sequence1);
65 final RequestSuccess<?, ?> success2 = new TransactionPurgeResponse(transactionIdentifier2, sequence2);
66 //complete entries in different order
67 final Optional<TransmittedConnectionEntry> completed2 =
68 queue.complete(new SuccessEnvelope(success2, 0L, sequence2, 1L), now2);
69 final Optional<TransmittedConnectionEntry> completed1 =
70 queue.complete(new SuccessEnvelope(success1, 0L, sequence1, 1L), now1);
72 final TransmittedConnectionEntry transmittedEntry1 = completed1.orElseThrow(AssertionError::new);
73 Assert.assertEquals(transmittedEntry1.getRequest(), request1);
74 Assert.assertEquals(transmittedEntry1.getTxSequence(), sequence1);
75 Assert.assertEquals(transmittedEntry1.getCallback(), callback1);
77 final TransmittedConnectionEntry transmittedEntry2 = completed2.orElseThrow(AssertionError::new);
78 Assert.assertEquals(transmittedEntry2.getRequest(), request2);
79 Assert.assertEquals(transmittedEntry2.getTxSequence(), sequence2);
80 Assert.assertEquals(transmittedEntry2.getCallback(), callback2);
84 public void testEnqueueCanTransmit() throws Exception {
85 final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
86 final Consumer<Response<?, ?>> callback = createConsumerMock();
87 final long now = Ticker.systemTicker().read();
88 queue.enqueue(new ConnectionEntry(request, callback, now), now);
89 final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
90 Assert.assertEquals(request, requestEnvelope.getMessage());
94 public void testEnqueueBackendFull() throws Exception {
95 final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
96 final Consumer<Response<?, ?>> callback = createConsumerMock();
97 final long now = Ticker.systemTicker().read();
98 final int sentMessages = getMaxInFlightMessages() + 1;
99 for (int i = 0; i < sentMessages; i++) {
100 queue.enqueue(new ConnectionEntry(request, callback, now), now);
102 for (int i = 0; i < getMaxInFlightMessages(); i++) {
103 probe.expectMsgClass(RequestEnvelope.class);
106 final Iterable<ConnectionEntry> entries = queue.asIterable();
107 Assert.assertEquals(sentMessages, Iterables.size(entries));
108 Assert.assertThat(entries, everyItem(entryWithRequest(request)));
113 public void testCanTransmitCount() throws Exception {
114 Assert.assertTrue(queue.canTransmitCount(getMaxInFlightMessages() - 1) > 0);
115 Assert.assertFalse(queue.canTransmitCount(getMaxInFlightMessages()) > 0);
120 public void testTransmit() throws Exception {
121 final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
122 final Consumer<Response<?, ?>> callback = createConsumerMock();
123 final long now = Ticker.systemTicker().read();
124 final ConnectionEntry entry = new ConnectionEntry(request, callback, now);
125 queue.transmit(entry, now);
126 final RequestEnvelope requestEnvelope = probe.expectMsgClass(RequestEnvelope.class);
127 Assert.assertEquals(request, requestEnvelope.getMessage());
131 public void testSetForwarder() throws Exception {
132 final FakeTicker ticker = new FakeTicker();
133 ticker.setAutoIncrementStep(1, TimeUnit.MICROSECONDS);
134 final Request request = new TransactionPurgeRequest(TRANSACTION_IDENTIFIER, 0L, probe.ref());
135 final Consumer<Response<?, ?>> callback = createConsumerMock();
136 final ConnectionEntry entry = new ConnectionEntry(request, callback, ticker.read());
137 queue.enqueue(entry, ticker.read());
138 final ReconnectForwarder forwarder = mock(ReconnectForwarder.class);
139 final long setForwarderNow = ticker.read();
140 queue.setForwarder(forwarder, setForwarderNow);
141 verify(forwarder).forwardEntry(isA(TransmittedConnectionEntry.class), eq(setForwarderNow));
142 final long secondEnqueueNow = ticker.read();
143 queue.enqueue(entry, secondEnqueueNow);
144 verify(forwarder).forwardEntry(entry, secondEnqueueNow);