import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
import java.io.IOException;
import java.util.Date;
import java.util.Set;
import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
import org.opendaylight.protocol.bgp.rib.spi.BGPTerminationReason;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Keepalive;
import org.slf4j.LoggerFactory;
@VisibleForTesting
-public class BGPSessionImpl extends AbstractProtocolSession<Notification> implements BGPSession, BGPSessionStatistics {
+public class BGPSessionImpl extends SimpleChannelInboundHandler<Notification> implements BGPSession, BGPSessionStatistics, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(BGPSessionImpl.class);
private static final int KA_TO_DEADTIMER_RATIO = 3;
+ static final String END_OF_INPUT = "End of input detected. Close the session.";
+
/**
* Internal session state.
*/
@Override
public synchronized void close() {
- LOG.info("Closing session: {}", this);
-
- if (this.state != State.IDLE) {
+ if (this.state != State.IDLE && this.channel.isActive()) {
this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode(
- BGPError.CEASE.getSubcode()).build());
- removePeerSession();
- this.channel.close();
- this.state = State.IDLE;
+ BGPError.CEASE.getSubcode()).build());
}
+ this.closeWithoutMessage();
}
/**
*
* @param msg incoming message
*/
- @Override
public synchronized void handleMessage(final Notification msg) {
// Update last reception time
this.lastMessageReceivedAt = System.nanoTime();
}
}
- @Override
public synchronized void endOfInput() {
if (this.state == State.UP) {
- this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
+ LOG.info(END_OF_INPUT);
+ this.listener.onSessionDown(this, new IOException(END_OF_INPUT));
}
}
@Override
public void operationComplete(final ChannelFuture f) {
if (!f.isSuccess()) {
- LOG.info("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel);
+ LOG.warn("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel);
} else {
LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel);
}
}
private synchronized void closeWithoutMessage() {
- LOG.debug("Closing session: {}", this);
+ LOG.info("Closing session: {}", this);
removePeerSession();
this.channel.close();
this.state = State.IDLE;
* Closes PCEP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be
* modified, because he initiated the closing. (To prevent concurrent modification exception).
*
- * @param closeObject
+ * @param error
*/
private void terminate(final BGPError error) {
this.writeAndFlush(new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode()).build());
return this.tableTypes;
}
- @Override
protected synchronized void sessionUp() {
this.sessionStats.startSessionStopwatch();
this.state = State.UP;
ChannelOutputLimiter getLimiter() {
return this.limiter;
}
+
+ @Override
+ public final void channelInactive(final ChannelHandlerContext ctx) {
+ LOG.debug("Channel {} inactive.", ctx.channel());
+ this.endOfInput();
+
+ try {
+ super.channelInactive(ctx);
+ } catch (final Exception e) {
+ throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+ }
+ }
+
+ @Override
+ protected final void channelRead0(final ChannelHandlerContext ctx, final Notification msg) {
+ LOG.debug("Message was received: {}", msg);
+ this.handleMessage(msg);
+ }
+
+ @Override
+ public final void handlerAdded(final ChannelHandlerContext ctx) {
+ this.sessionUp();
+ }
}