BUG-5243: Failed to send message Notify
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / BGPSessionImpl.java
index c383f034b4dfa2a4aabb725401da01849565997d..cfc01e536d527ee0932af30004bb2003fd24b6fe 100644 (file)
@@ -16,6 +16,8 @@ import com.google.common.collect.Sets;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
 import java.io.IOException;
 import java.util.Date;
 import java.util.Set;
@@ -31,7 +33,6 @@ import org.opendaylight.protocol.bgp.rib.impl.spi.BGPSessionStatistics;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSession;
 import org.opendaylight.protocol.bgp.rib.spi.BGPSessionListener;
 import org.opendaylight.protocol.bgp.rib.spi.BGPTerminationReason;
-import org.opendaylight.protocol.framework.AbstractProtocolSession;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.AsNumber;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev100924.Ipv4Address;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130919.Keepalive;
@@ -52,7 +53,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @VisibleForTesting
-public class BGPSessionImpl extends AbstractProtocolSession<Notification> implements BGPSession, BGPSessionStatistics {
+public class BGPSessionImpl extends SimpleChannelInboundHandler<Notification> implements BGPSession, BGPSessionStatistics, AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(BGPSessionImpl.class);
 
@@ -60,6 +61,8 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
 
     private static final int KA_TO_DEADTIMER_RATIO = 3;
 
+    static final String END_OF_INPUT = "End of input detected. Close the session.";
+
     /**
      * Internal session state.
      */
@@ -174,15 +177,11 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
 
     @Override
     public synchronized void close() {
-        LOG.info("Closing session: {}", this);
-
-        if (this.state != State.IDLE) {
+        if (this.state != State.IDLE && this.channel.isActive()) {
             this.writeAndFlush(new NotifyBuilder().setErrorCode(BGPError.CEASE.getCode()).setErrorSubcode(
-                    BGPError.CEASE.getSubcode()).build());
-            removePeerSession();
-            this.channel.close();
-            this.state = State.IDLE;
+                BGPError.CEASE.getSubcode()).build());
         }
+        this.closeWithoutMessage();
     }
 
     /**
@@ -190,7 +189,6 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
      *
      * @param msg incoming message
      */
-    @Override
     public synchronized void handleMessage(final Notification msg) {
         // Update last reception time
         this.lastMessageReceivedAt = System.nanoTime();
@@ -223,10 +221,10 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
         }
     }
 
-    @Override
     public synchronized void endOfInput() {
         if (this.state == State.UP) {
-            this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
+            LOG.info(END_OF_INPUT);
+            this.listener.onSessionDown(this, new IOException(END_OF_INPUT));
         }
     }
 
@@ -237,7 +235,7 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
                 @Override
                 public void operationComplete(final ChannelFuture f) {
                     if (!f.isSuccess()) {
-                        LOG.info("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel);
+                        LOG.warn("Failed to send message {} to socket {}", msg, f.cause(), BGPSessionImpl.this.channel);
                     } else {
                         LOG.trace("Message {} sent to socket {}", msg, BGPSessionImpl.this.channel);
                     }
@@ -269,7 +267,7 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
     }
 
     private synchronized void closeWithoutMessage() {
-        LOG.debug("Closing session: {}", this);
+        LOG.info("Closing session: {}", this);
         removePeerSession();
         this.channel.close();
         this.state = State.IDLE;
@@ -279,7 +277,7 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
      * Closes PCEP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be
      * modified, because he initiated the closing. (To prevent concurrent modification exception).
      *
-     * @param closeObject
+     * @param error
      */
     private void terminate(final BGPError error) {
         this.writeAndFlush(new NotifyBuilder().setErrorCode(error.getCode()).setErrorSubcode(error.getSubcode()).build());
@@ -364,7 +362,6 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
         return this.tableTypes;
     }
 
-    @Override
     protected synchronized void sessionUp() {
         this.sessionStats.startSessionStopwatch();
         this.state = State.UP;
@@ -412,4 +409,27 @@ public class BGPSessionImpl extends AbstractProtocolSession<Notification> implem
     ChannelOutputLimiter getLimiter() {
         return this.limiter;
     }
+
+    @Override
+    public final void channelInactive(final ChannelHandlerContext ctx) {
+        LOG.debug("Channel {} inactive.", ctx.channel());
+        this.endOfInput();
+
+        try {
+            super.channelInactive(ctx);
+        } catch (final Exception e) {
+            throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+        }
+    }
+
+    @Override
+    protected final void channelRead0(final ChannelHandlerContext ctx, final Notification msg) {
+        LOG.debug("Message was received: {}", msg);
+        this.handleMessage(msg);
+    }
+
+    @Override
+    public final void handlerAdded(final ChannelHandlerContext ctx) {
+        this.sessionUp();
+    }
 }