X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=pcep%2Fimpl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fprotocol%2Fpcep%2Fimpl%2FPCEPSessionImpl.java;h=db5c8b50b8f6417a9bcca86a788601a1da86a05f;hb=1f18c032706004ce9bf0fcc648090ec5211b945a;hp=55a9fe8a264038d7a3d1b6527a29d1c1de0e4113;hpb=6b4e7ad0a33415d92ad1c505ef7ec2b4cbf2eb24;p=bgpcep.git diff --git a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java index 55a9fe8a26..db5c8b50b8 100644 --- a/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java +++ b/pcep/impl/src/main/java/org/opendaylight/protocol/pcep/impl/PCEPSessionImpl.java @@ -7,13 +7,17 @@ */ package org.opendaylight.protocol.pcep.impl; +import static java.util.Objects.requireNonNull; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.MoreObjects.ToStringHelper; -import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.concurrent.Future; import java.io.IOException; import java.net.InetAddress; @@ -22,29 +26,30 @@ import java.util.Date; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.TimeUnit; -import org.opendaylight.protocol.framework.AbstractProtocolSession; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.protocol.pcep.PCEPCloseTermination; import org.opendaylight.protocol.pcep.PCEPSession; import org.opendaylight.protocol.pcep.PCEPSessionListener; import org.opendaylight.protocol.pcep.TerminationReason; +import org.opendaylight.protocol.pcep.impl.spi.Util; import org.opendaylight.protocol.pcep.spi.PCEPErrors; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.LocalPref; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.Messages; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.PeerPref; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.CloseBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Keepalive; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.KeepaliveBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.CloseMessage; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.KeepaliveMessage; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.PcerrMessage; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.Open; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs; -import org.opendaylight.yangtools.yang.binding.DataContainer; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.CloseBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Keepalive; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.KeepaliveBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPref; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.Messages; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerPref; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.CloseMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.KeepaliveMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.OpenMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.PcerrMessage; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.message.CCloseMessageBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.object.CCloseBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.keepalive.message.KeepaliveMessageBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.Open; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.open.Tlvs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,19 +57,20 @@ import org.slf4j.LoggerFactory; * Implementation of PCEPSession. (Not final for testing.) */ @VisibleForTesting -public class PCEPSessionImpl extends AbstractProtocolSession implements PCEPSession { +public class PCEPSessionImpl extends SimpleChannelInboundHandler implements PCEPSession { + private static final long MINUTE = TimeUnit.MINUTES.toNanos(1); + private static Ticker TICKER = Ticker.systemTicker(); /** * System.nanoTime value about when was sent the last message Protected to be updated also in tests. */ - @VisibleForTesting - protected volatile long lastMessageSentAt; + private volatile long lastMessageSentAt; /** * System.nanoTime value about when was received the last message */ private long lastMessageReceivedAt; - private final Queue unknownMessagesTimes = new LinkedList(); + private final Queue unknownMessagesTimes = new LinkedList<>(); private final PCEPSessionListener listener; @@ -83,7 +89,8 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements private int maxUnknownMessages; // True if the listener should not be notified about events - private boolean closed = false; + @GuardedBy("this") + private final AtomicBoolean closed = new AtomicBoolean(false); private final Channel channel; @@ -92,12 +99,12 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements private final PCEPSessionState sessionState; PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel, - final Open localOpen, final Open remoteOpen) { - this.listener = Preconditions.checkNotNull(listener); - this.channel = Preconditions.checkNotNull(channel); - this.localOpen = Preconditions.checkNotNull(localOpen); - this.remoteOpen = Preconditions.checkNotNull(remoteOpen); - this.lastMessageReceivedAt = System.nanoTime(); + final Open localOpen, final Open remoteOpen) { + this.listener = requireNonNull(listener); + this.channel = requireNonNull(channel); + this.localOpen = requireNonNull(localOpen); + this.remoteOpen = requireNonNull(remoteOpen); + this.lastMessageReceivedAt = TICKER.read(); if (maxUnknownMessages != 0) { this.maxUnknownMessages = maxUnknownMessages; @@ -105,25 +112,15 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements if (getDeadTimerValue() != 0) { - channel.eventLoop().schedule(new Runnable() { - @Override - public void run() { - handleDeadTimer(); - } - }, getDeadTimerValue(), TimeUnit.SECONDS); + channel.eventLoop().schedule(this::handleDeadTimer, getDeadTimerValue(), TimeUnit.SECONDS); } if (getKeepAliveTimerValue() != 0) { - channel.eventLoop().schedule(new Runnable() { - @Override - public void run() { - handleKeepaliveTimer(); - } - }, getKeepAliveTimerValue(), TimeUnit.SECONDS); + channel.eventLoop().schedule(this::handleKeepaliveTimer, getKeepAliveTimerValue(), TimeUnit.SECONDS); } LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(), - remoteOpen.getSessionId()); + remoteOpen.getSessionId()); this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel); } @@ -142,7 +139,7 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements * state will become IDLE), that rescheduling won't occur. */ private synchronized void handleDeadTimer() { - final long ct = System.nanoTime(); + final long ct = TICKER.read(); final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue()); @@ -151,12 +148,7 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements LOG.debug("DeadTimer expired. {}", new Date()); this.terminate(TerminationReason.EXP_DEADTIMER); } else { - this.channel.eventLoop().schedule(new Runnable() { - @Override - public void run() { - handleDeadTimer(); - } - }, nextDead - ct, TimeUnit.NANOSECONDS); + this.channel.eventLoop().schedule(this::handleDeadTimer, nextDead - ct, TimeUnit.NANOSECONDS); } } } @@ -167,8 +159,8 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method * starts to execute (the session state will become IDLE), that rescheduling won't occur. */ - private void handleKeepaliveTimer() { - final long ct = System.nanoTime(); + private void handleKeepaliveTimer() { + final long ct = TICKER.read(); long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue()); @@ -178,15 +170,20 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue()); } - this.channel.eventLoop().schedule(new Runnable() { - @Override - public void run() { - handleKeepaliveTimer(); - } - }, nextKeepalive - ct, TimeUnit.NANOSECONDS); + this.channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS); } } + /** + * Handle exception occurred in the PCEP session. The session in error state should be closed + * properly so that it can be restored later. + */ + @VisibleForTesting + void handleException(final Throwable cause) { + LOG.error("Exception captured for session {}, closing session.", this, cause); + terminate(TerminationReason.UNKNOWN); + } + /** * Sends message to serialization. * @@ -195,7 +192,7 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements @Override public Future sendMessage(final Message msg) { final ChannelFuture f = this.channel.writeAndFlush(msg); - this.lastMessageSentAt = System.nanoTime(); + this.lastMessageSentAt = TICKER.read(); this.sessionState.updateLastSentMsg(); if (!(msg instanceof KeepaliveMessage)) { LOG.debug("PCEP Message enqueued: {}", msg); @@ -204,27 +201,34 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements this.sessionState.setLastSentError(msg); } - f.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(final ChannelFuture arg) { - if (arg.isSuccess()) { - LOG.trace("Message sent to socket: {}", msg); - } else { - LOG.debug("Message not sent: {}", msg, arg.cause()); - } + f.addListener((ChannelFutureListener) arg -> { + if (arg.isSuccess()) { + LOG.trace("Message sent to socket: {}", msg); + } else { + LOG.debug("Message not sent: {}", msg, arg.cause()); } }); return f; } + @VisibleForTesting + ChannelFuture closeChannel() { + LOG.info("Closing PCEP session channel: {}", this.channel); + return this.channel.close(); + } + + @VisibleForTesting + public synchronized boolean isClosed() { + return this.closed.get(); + } + /** * Closes PCEP session without sending a Close message, as the channel is no longer active. */ @Override - public void close() { - LOG.info("Closing PCEP session: {}", this); - this.channel.close(); + public synchronized void close() { + close(null); } /** @@ -233,12 +237,20 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements * inside the session or from the listener, therefore the parent of this session should be informed. */ @Override - public synchronized void close(final TerminationReason reason) { - LOG.info("Closing PCEP session: {}", this); - this.closed = true; - this.sendMessage(new CloseBuilder().setCCloseMessage( - new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build()); - this.close(); + public void close(final TerminationReason reason) { + if (this.closed.getAndSet(true)) { + LOG.debug("Session is already closed."); + return; + } + // only send close message when the reason is provided + if (reason != null) { + LOG.info("Closing PCEP session with reason {}: {}", reason, this); + sendMessage(new CloseBuilder().setCCloseMessage( + new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build()); + } else { + LOG.info("Closing PCEP session: {}", this); + } + closeChannel(); } @Override @@ -252,19 +264,17 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements } private synchronized void terminate(final TerminationReason reason) { - LOG.info("Local PCEP session termination : {}", reason); + if (this.closed.get()) { + LOG.debug("Session {} is already closed.", this); + return; + } + close(reason); this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason)); - this.closed = true; - this.sendMessage(new CloseBuilder().setCCloseMessage( - new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build()); - this.close(); } - @Override - public synchronized void endOfInput() { - if (!this.closed) { + synchronized void endOfInput() { + if (!this.closed.getAndSet(true)) { this.listener.onSessionDown(this, new IOException("End of input detected. Close the session.")); - this.closed = true; } } @@ -291,13 +301,13 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements * @param error documented error in RFC5440 or draft */ @VisibleForTesting - public void handleMalformedMessage(final PCEPErrors error) { - final long ct = System.nanoTime(); + void handleMalformedMessage(final PCEPErrors error) { + final long ct = TICKER.read(); this.sendErrorMessage(error); if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) { this.unknownMessagesTimes.add(ct); - while (ct - this.unknownMessagesTimes.peek() > TimeUnit.MINUTES.toNanos(1)) { - this.unknownMessagesTimes.poll(); + while (ct - this.unknownMessagesTimes.peek() > MINUTE) { + final Long poll = this.unknownMessagesTimes.poll(); } if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) { this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS); @@ -311,10 +321,13 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements * * @param msg incoming message */ - @Override public synchronized void handleMessage(final Message msg) { + if (this.closed.get()) { + LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg); + return; + } // Update last reception time - this.lastMessageReceivedAt = System.nanoTime(); + this.lastMessageReceivedAt = TICKER.read(); this.sessionState.updateLastReceivedMsg(); if (!(msg instanceof KeepaliveMessage)) { LOG.debug("PCEP message {} received.", msg); @@ -330,7 +343,9 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements * exception is CLOSE message, which needs to be converted into a * session DOWN event. */ - this.close(); + close(); + this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason + .forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason()))); } else { // This message needs to be handled by the user if (msg instanceof PcerrMessage) { @@ -345,21 +360,25 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements return addToStringAttributes(MoreObjects.toStringHelper(this)).toString(); } - protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { + private ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) { toStringHelper.add("channel", this.channel); toStringHelper.add("localOpen", this.localOpen); toStringHelper.add("remoteOpen", this.remoteOpen); return toStringHelper; } - @Override @VisibleForTesting - public void sessionUp() { - this.listener.onSessionUp(this); + void sessionUp() { + try { + this.listener.onSessionUp(this); + } catch (final Exception e) { + handleException(e); + throw e; + } } @VisibleForTesting - protected final Queue getUnknownMessagesTimes() { + final Queue getUnknownMessagesTimes() { return this.unknownMessagesTimes; } @@ -379,11 +398,45 @@ public class PCEPSessionImpl extends AbstractProtocolSession implements } @Override - public Class getImplementedInterface() { - throw new UnsupportedOperationException(); + public Open getLocalOpen() { + return this.sessionState.getLocalOpen(); + } + + @Override + public synchronized final void channelInactive(final ChannelHandlerContext ctx) { + LOG.debug("Channel {} inactive.", ctx.channel()); + endOfInput(); + + try { + super.channelInactive(ctx); + } catch (final Exception e) { + throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e); + } + } + + @Override + protected synchronized final void channelRead0(final ChannelHandlerContext ctx, final Message msg) { + LOG.debug("Message was received: {}", msg); + handleMessage(msg); } + + @Override + public synchronized final void handlerAdded(final ChannelHandlerContext ctx) { + this.sessionUp(); + } + + @Override + public synchronized void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { + handleException(cause); + } + @Override - public void resetStats() { - this.sessionState.reset(); + public Tlvs getLocalTlvs() { + return this.localOpen.getTlvs(); + } + + @VisibleForTesting + static void setTicker(final Ticker ticker) { + TICKER = ticker; } }