import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
-import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Ticker;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
*/
@VisibleForTesting
public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implements PCEPSession {
- private static final long MINUTE = TimeUnit.MINUTES.toNanos(1);
- private static Ticker TICKER = Ticker.systemTicker();
+ private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class);
+ private static final long MINUTE_NANOS = TimeUnit.MINUTES.toNanos(1);
+ private static final Keepalive KEEPALIVE = new KeepaliveBuilder()
+ .setKeepaliveMessage(new KeepaliveMessageBuilder().build())
+ .build();
+
/**
* System.nanoTime value about when was sent the last message Protected to be updated also in tests.
*/
*/
private final Open remoteOpen;
- private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class);
-
private int maxUnknownMessages;
// True if the listener should not be notified about events
private final Channel channel;
- private final Keepalive kaMessage =
- new KeepaliveBuilder().setKeepaliveMessage(new KeepaliveMessageBuilder().build()).build();
private final PCEPSessionState sessionState;
+ private final Ticker ticker;
+
PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
final Open localOpen, final Open remoteOpen) {
+ this(listener, maxUnknownMessages, channel, localOpen, remoteOpen, Ticker.systemTicker());
+ }
+
+ @VisibleForTesting
+ PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
+ final Open localOpen, final Open remoteOpen, final Ticker ticker) {
this.listener = requireNonNull(listener);
this.channel = requireNonNull(channel);
this.localOpen = requireNonNull(localOpen);
this.remoteOpen = requireNonNull(remoteOpen);
- this.lastMessageReceivedAt = TICKER.read();
+ this.ticker = requireNonNull(ticker);
+ lastMessageReceivedAt = ticker.read();
if (maxUnknownMessages != 0) {
this.maxUnknownMessages = maxUnknownMessages;
}
-
if (getDeadTimerValue() != 0) {
channel.eventLoop().schedule(this::handleDeadTimer, getDeadTimerValue(), TimeUnit.SECONDS);
}
LOG.info("Session {}[{}] <-> {}[{}] started",
channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(), remoteOpen.getSessionId());
- this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
+ sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
}
public final Integer getKeepAliveTimerValue() {
- return this.localOpen.getKeepalive().intValue();
+ return localOpen.getKeepalive().intValue();
}
public final Integer getDeadTimerValue() {
- return this.remoteOpen.getDeadTimer().intValue();
+ return remoteOpen.getDeadTimer().intValue();
}
/**
* state will become IDLE), that rescheduling won't occur.
*/
private synchronized void handleDeadTimer() {
- final long ct = TICKER.read();
+ final long ct = ticker.read();
- final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
+ final long nextDead = lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
- if (this.channel.isActive()) {
+ if (channel.isActive()) {
if (ct >= nextDead) {
LOG.debug("DeadTimer expired. {}", new Date());
- this.terminate(TerminationReason.EXP_DEADTIMER);
+ terminate(TerminationReason.EXP_DEADTIMER);
} else {
- this.channel.eventLoop().schedule(this::handleDeadTimer, nextDead - ct, TimeUnit.NANOSECONDS);
+ channel.eventLoop().schedule(this::handleDeadTimer, nextDead - ct, TimeUnit.NANOSECONDS);
}
}
}
* starts to execute (the session state will become IDLE), that rescheduling won't occur.
*/
private void handleKeepaliveTimer() {
- final long ct = TICKER.read();
+ final long ct = ticker.read();
- long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
+ long nextKeepalive = lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
- if (this.channel.isActive()) {
+ if (channel.isActive()) {
if (ct >= nextKeepalive) {
- this.sendMessage(this.kaMessage);
- nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
+ sendMessage(KEEPALIVE);
+ nextKeepalive = lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
}
- this.channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS);
+ channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS);
}
}
*/
@Override
public Future<Void> sendMessage(final Message msg) {
- final ChannelFuture f = this.channel.writeAndFlush(msg);
- this.lastMessageSentAt = TICKER.read();
- this.sessionState.updateLastSentMsg();
+ final ChannelFuture f = channel.writeAndFlush(msg);
+ lastMessageSentAt = ticker.read();
+ sessionState.updateLastSentMsg();
if (!(msg instanceof KeepaliveMessage)) {
LOG.debug("PCEP Message enqueued: {}", msg);
}
if (msg instanceof PcerrMessage) {
- this.sessionState.setLastSentError(msg);
+ sessionState.setLastSentError(msg);
}
f.addListener((ChannelFutureListener) arg -> {
}
@VisibleForTesting
- ChannelFuture closeChannel() {
- LOG.info("Closing PCEP session channel: {}", this.channel);
- return this.channel.close();
+ Future<Void> closeChannel() {
+ LOG.info("Closing PCEP session channel: {}", channel);
+ return channel.close();
}
@VisibleForTesting
public synchronized boolean isClosed() {
- return this.closed.get();
+ return closed.get();
}
/**
*/
@Override
public void close(final TerminationReason reason) {
- if (this.closed.getAndSet(true)) {
+ if (closed.getAndSet(true)) {
LOG.debug("Session is already closed.");
return;
}
@Override
public Tlvs getRemoteTlvs() {
- return this.remoteOpen.getTlvs();
+ return remoteOpen.getTlvs();
}
@Override
public InetAddress getRemoteAddress() {
- return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
+ return ((InetSocketAddress) channel.remoteAddress()).getAddress();
}
private synchronized void terminate(final TerminationReason reason) {
- if (this.closed.get()) {
+ if (closed.get()) {
LOG.debug("Session {} is already closed.", this);
return;
}
close(reason);
- this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
+ listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
}
synchronized void endOfInput() {
- if (!this.closed.getAndSet(true)) {
- this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
+ if (!closed.getAndSet(true)) {
+ listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
}
}
* @param open Open Object
*/
private void sendErrorMessage(final PCEPErrors value, final Open open) {
- this.sendMessage(Util.createErrorMessage(value, open));
+ sendMessage(Util.createErrorMessage(value, open));
}
/**
*/
@VisibleForTesting
void handleMalformedMessage(final PCEPErrors error) {
- final long ct = TICKER.read();
+ final long ct = ticker.read();
this.sendErrorMessage(error);
if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
- this.unknownMessagesTimes.add(ct);
- while (ct - this.unknownMessagesTimes.peek() > MINUTE) {
- this.unknownMessagesTimes.remove();
+ unknownMessagesTimes.add(ct);
+ while (ct - unknownMessagesTimes.peek() > MINUTE_NANOS) {
+ unknownMessagesTimes.remove();
}
- if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
- this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
+ if (unknownMessagesTimes.size() > maxUnknownMessages) {
+ terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
}
}
}
* @param msg incoming message
*/
public synchronized void handleMessage(final Message msg) {
- if (this.closed.get()) {
+ if (closed.get()) {
LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg);
return;
}
// Update last reception time
- this.lastMessageReceivedAt = TICKER.read();
- this.sessionState.updateLastReceivedMsg();
+ lastMessageReceivedAt = ticker.read();
+ sessionState.updateLastReceivedMsg();
if (!(msg instanceof KeepaliveMessage)) {
LOG.debug("PCEP message {} received.", msg);
}
* session DOWN event.
*/
close();
- this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason
+ listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason
.forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason())));
} else {
// This message needs to be handled by the user
if (msg instanceof PcerrMessage) {
- this.sessionState.setLastReceivedError(msg);
+ sessionState.setLastReceivedError(msg);
}
- this.listener.onMessage(this, msg);
+ listener.onMessage(this, msg);
}
}
@Override
public final String toString() {
- return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
- }
-
- private ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
- toStringHelper.add("channel", this.channel);
- toStringHelper.add("localOpen", this.localOpen);
- toStringHelper.add("remoteOpen", this.remoteOpen);
- return toStringHelper;
+ return MoreObjects.toStringHelper(this)
+ .add("channel", channel)
+ .add("localOpen", localOpen)
+ .add("remoteOpen", remoteOpen)
+ .toString();
}
@VisibleForTesting
@SuppressWarnings("checkstyle:IllegalCatch")
void sessionUp() {
try {
- this.listener.onSessionUp(this);
+ listener.onSessionUp(this);
} catch (final RuntimeException e) {
handleException(e);
throw e;
@VisibleForTesting
final Queue<Long> getUnknownMessagesTimes() {
- return this.unknownMessagesTimes;
+ return unknownMessagesTimes;
}
@Override
public Messages getMessages() {
- return this.sessionState.getMessages(this.unknownMessagesTimes.size());
+ return sessionState.getMessages(unknownMessagesTimes.size());
}
@Override
public LocalPref getLocalPref() {
- return this.sessionState.getLocalPref();
+ return sessionState.getLocalPref();
}
@Override
public PeerPref getPeerPref() {
- return this.sessionState.getPeerPref();
+ return sessionState.getPeerPref();
}
@Override
public Open getLocalOpen() {
- return this.sessionState.getLocalOpen();
+ return sessionState.getLocalOpen();
}
@Override
@Override
public final synchronized void handlerAdded(final ChannelHandlerContext ctx) {
- this.sessionUp();
+ sessionUp();
}
@Override
@Override
public Tlvs getLocalTlvs() {
- return this.localOpen.getTlvs();
- }
-
- @VisibleForTesting
- static void setTicker(final Ticker ticker) {
- TICKER = ticker;
+ return localOpen.getTlvs();
}
}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.opendaylight.protocol.util.CheckTestUtil.checkEquals;
import com.google.common.base.Ticker;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.GlobalEventExecutor;
-import java.util.Queue;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.opendaylight.protocol.pcep.PCEPSessionListener;
+import org.opendaylight.protocol.pcep.PCEPTerminationReason;
import org.opendaylight.protocol.pcep.impl.spi.Util;
import org.opendaylight.protocol.pcep.spi.PCEPErrors;
-import org.opendaylight.protocol.util.CheckTestUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.app.config.rev160707.pcep.dispatcher.config.TlsBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Keepalive;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Open;
import org.opendaylight.yangtools.yang.common.Uint8;
public class FiniteStateMachineTest extends AbstractPCEPSessionTest {
-
private DefaultPCEPSessionNegotiator serverSession;
private DefaultPCEPSessionNegotiator tlsSessionNegotiator;
- private final TestTicker ticker = new TestTicker();
@Before
public void setup() {
}
@Test
- public void testUnknownMessage() throws Exception {
- final SimpleSessionListener client = new SimpleSessionListener();
- final PCEPSessionImpl session = new PCEPSessionImpl(client, 5, channel,
- openMsg.getOpenMessage().getOpen(), openMsg.getOpenMessage().getOpen());
- PCEPSessionImpl.setTicker(ticker);
- session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
- final Queue<Long> qeue = session.getUnknownMessagesTimes();
- CheckTestUtil.checkEquals(() -> assertEquals(1, qeue.size()));
+ public void testUnknownMessage() {
+ final Ticker ticker = mock(Ticker.class);
+ doReturn(
+ // session create
+ 0L,
+ // first four receive/send
+ 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L,
+ // a minute has passed since second receive/send
+ 60000000004L, 60000000005L,
+ // a minute has passed since third receive/send and the the time gets stuck :)
+ 60000000006L, 60000000007L).when(ticker).read();
+
+ final var listener = mock(PCEPSessionListener.class);
+ final var session = new PCEPSessionImpl(listener, 5, channel, openMsg.getOpenMessage().getOpen(),
+ openMsg.getOpenMessage().getOpen(), ticker);
+
+ final var qeue = session.getUnknownMessagesTimes();
session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
- CheckTestUtil.checkEquals(() -> assertEquals(2, qeue.size()));
+ assertEquals(1, qeue.size());
session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
- CheckTestUtil.checkEquals(() -> assertEquals(3, qeue.size()));
+ assertEquals(2, qeue.size());
session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
- CheckTestUtil.checkEquals(() -> assertEquals(4, qeue.size()));
+ assertEquals(3, qeue.size());
session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
- CheckTestUtil.checkEquals(() -> assertEquals(3, qeue.size()));
+ assertEquals(4, qeue.size());
session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
- CheckTestUtil.checkEquals(() -> assertEquals(3, qeue.size()));
+ assertEquals(3, qeue.size());
session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
- CheckTestUtil.checkEquals(() -> assertEquals(4, qeue.size()));
+ assertEquals(3, qeue.size());
session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
- CheckTestUtil.checkEquals(() -> assertEquals(5, qeue.size()));
+ assertEquals(4, qeue.size());
session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
- synchronized (client) {
- while (client.up) {
- client.wait();
- }
- }
- CheckTestUtil.checkEquals(() -> assertTrue(!client.up));
- }
+ assertEquals(5, qeue.size());
- private static final class TestTicker extends Ticker {
- private long counter = 0L;
-
- TestTicker() {
- }
-
- @Override
- public long read() {
- if (counter == 8) {
- counter++;
- return 60000000003L;
- } else if (counter == 10) {
- counter++;
- return 60000000006L;
- }
- return counter++;
- }
+ final var captor = ArgumentCaptor.forClass(PCEPTerminationReason.class);
+ doNothing().when(listener).onSessionTerminated(eq(session), captor.capture());
+ session.handleMalformedMessage(PCEPErrors.CAPABILITY_NOT_SUPPORTED);
+ verify(ticker, times(20)).read();
+ assertEquals("TOO_MANY_UNKNOWN_MSGS", captor.getValue().getErrorMessage());
}
}