Add new revision for pcep types model
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / PCEPSessionImpl.java
index f867462a1e0a12d2a5aef2388e2d2f27fa8454eb..db5c8b50b8f6417a9bcca86a788601a1da86a05f 100644 (file)
  */
 package org.opendaylight.protocol.pcep.impl;
 
-import org.opendaylight.protocol.framework.DeserializerException;
-import org.opendaylight.protocol.framework.DocumentedException;
-import org.opendaylight.protocol.framework.ProtocolMessage;
-import org.opendaylight.protocol.framework.ProtocolMessageFactory;
-import org.opendaylight.protocol.framework.ProtocolOutputStream;
-import org.opendaylight.protocol.framework.ProtocolSession;
-import org.opendaylight.protocol.framework.SessionParent;
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
+import com.google.common.base.Ticker;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.concurrent.Future;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
-import org.opendaylight.protocol.pcep.PCEPConnection;
-import org.opendaylight.protocol.pcep.PCEPDeserializerException;
-import org.opendaylight.protocol.pcep.PCEPErrorTermination;
-import org.opendaylight.protocol.pcep.PCEPErrors;
-import org.opendaylight.protocol.pcep.PCEPMessage;
 import org.opendaylight.protocol.pcep.PCEPSession;
 import org.opendaylight.protocol.pcep.PCEPSessionListener;
-import org.opendaylight.protocol.pcep.PCEPSessionPreferences;
-import org.opendaylight.protocol.pcep.PCEPSessionProposalChecker;
-import org.opendaylight.protocol.pcep.PCEPTlv;
-import org.opendaylight.protocol.pcep.impl.message.PCEPRawMessage;
-import org.opendaylight.protocol.pcep.message.PCEPCloseMessage;
-import org.opendaylight.protocol.pcep.message.PCEPErrorMessage;
-import org.opendaylight.protocol.pcep.message.PCEPKeepAliveMessage;
-import org.opendaylight.protocol.pcep.message.PCEPOpenMessage;
-import org.opendaylight.protocol.pcep.object.PCEPCloseObject;
-import org.opendaylight.protocol.pcep.object.PCEPCloseObject.Reason;
-import org.opendaylight.protocol.pcep.object.PCEPErrorObject;
-import org.opendaylight.protocol.pcep.object.PCEPOpenObject;
-import org.opendaylight.protocol.pcep.tlv.NodeIdentifierTlv;
+import org.opendaylight.protocol.pcep.TerminationReason;
+import org.opendaylight.protocol.pcep.impl.spi.Util;
+import org.opendaylight.protocol.pcep.spi.PCEPErrors;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.CloseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Keepalive;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.KeepaliveBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.Messages;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerPref;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.CloseMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.KeepaliveMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.OpenMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.PcerrMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.message.CCloseMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.object.CCloseBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.keepalive.message.KeepaliveMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.Open;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.open.Tlvs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.Timer;
-import java.util.TimerTask;
-
 /**
  * Implementation of PCEPSession. (Not final for testing.)
  */
-class PCEPSessionImpl implements PCEPSession, ProtocolSession, PCEPSessionRuntimeMXBean {
-
-       /**
-        * KeepAlive Timer is to be scheduled periodically, each time it starts, it sends KeepAlive Message.
-        */
-       private class KeepAliveTimer extends TimerTask {
-               private final PCEPSessionImpl parent;
-
-               public KeepAliveTimer(final PCEPSessionImpl parent) {
-                       this.parent = parent;
-               }
-
-               @Override
-               public void run() {
-                       this.parent.handleKeepaliveTimer();
-               }
-       }
-
-       /**
-        * DeadTimer is to be scheduled periodically, when it expires, it closes PCEP session.
-        */
-       private class DeadTimer extends TimerTask {
-               private final PCEPSessionImpl parent;
-
-               public DeadTimer(final PCEPSessionImpl parent) {
-                       this.parent = parent;
-               }
-
-               @Override
-               public void run() {
-                       this.parent.handleDeadtimer();
-               }
-       }
-
-       /**
-        * OpenWaitTimer runs just once, but can be rescheduled or canceled before expiration. When it expires, it sends an
-        * error message (1, 2)
-        */
-       private class OpenWaitTimer extends TimerTask {
-
-               private final PCEPSessionImpl parent;
-
-               public OpenWaitTimer(final PCEPSessionImpl parent) {
-                       this.parent = parent;
-               }
-
-               @Override
-               public void run() {
-                       this.parent.handleOpenWait();
-               }
-       }
-
-       /**
-        * KeepWaitTimer runs just once, but can be rescheduled or canceled before expiration. When it expires, it sends an
-        * error message (1, 7)
-        */
-       private class KeepWaitTimer extends TimerTask {
-
-               private final PCEPSessionImpl parent;
-
-               public KeepWaitTimer(final PCEPSessionImpl parent) {
-                       this.parent = parent;
-               }
-
-               @Override
-               public void run() {
-                       this.parent.handleKeepWait();
-               }
-       }
-
-       /**
-        * Possible states for Finite State Machine
-        */
-       private enum State {
-               IDLE, OPEN_WAIT, KEEP_WAIT, UP
-       }
-
-       /**
-        * In seconds.
-        */
-       public static final int OPEN_WAIT_TIMER_VALUE = 60;
-
-       public static final int KEEP_WAIT_TIMER_VALUE = 60;
-
-       public int KEEP_ALIVE_TIMER_VALUE = 3;
-
-       public int DEAD_TIMER_VALUE = 4 * this.KEEP_ALIVE_TIMER_VALUE;
-
-       /**
-        * Actual state of the FSM.
-        */
-       private State state;
-
-       private OpenWaitTimer openWaitTimer;
-
-       private KeepWaitTimer keepWaitTimer;
-
-       /**
-        * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
-        */
-       protected long lastMessageSentAt;
-
-       /**
-        * System.nanoTime value about when was received the last message
-        */
-       private long lastMessageReceivedAt;
-
-       private boolean localOK = false;
-
-       private boolean remoteOK = false;
-
-       private boolean openRetry = false;
-
-       private final int sessionId;
-
-       /**
-        * Protected for testing.
-        */
-       protected int maxUnknownMessages = 5;
-
-       protected final Queue<Long> unknownMessagesTimes = new LinkedList<Long>();
-
-       private final PCEPSessionListener listener;
-
-       private PCEPSessionProposalChecker checker = null;
-
-       /**
-        * Open Object with session characteristics that were accepted by another PCE (sent from this session).
-        */
-       private PCEPOpenObject localOpen = null;
-
-       /**
-        * Open Object with session characteristics for this session (sent from another PCE).
-        */
-       private PCEPOpenObject remoteOpen = null;
-
-       private final ProtocolOutputStream outputStream;
-
-       private static final Logger logger = LoggerFactory.getLogger(PCEPSessionImpl.class);
-
-       /**
-        * Timer object grouping FSM Timers
-        */
-       private final Timer stateTimer;
-
-       private final SessionParent parent;
-
-       private final PCEPMessageFactory factory;
-
-       private int sentMsgCount = 0;
-
-       private int receivedMsgCount = 0;
-
-       private final String peerAddress;
-
-       PCEPSessionImpl(final SessionParent parent, final Timer timer, final PCEPConnection connection, final PCEPMessageFactory factory,
-                       final int maxUnknownMessages, final int sessionId) {
-               this.state = State.IDLE;
-               this.listener = connection.getListener();
-               this.checker = connection.getProposalChecker();
-               this.sessionId = sessionId;
-               this.localOpen = connection.getProposal().getOpenObject();
-               this.outputStream = new ProtocolOutputStream();
-               this.peerAddress = connection.getPeerAddress().getHostString();
-               this.stateTimer = timer;
-               this.parent = parent;
-               this.factory = factory;
-               if (this.maxUnknownMessages != 0)
-                       this.maxUnknownMessages = maxUnknownMessages;
-       }
-
-       @Override
-       public void startSession() {
-               logger.debug("Session started.");
-               this.sendMessage(new PCEPOpenMessage(this.localOpen));
-               this.restartOpenWait();
-               this.changeState(State.OPEN_WAIT);
-       }
-
-       /**
-        * OpenWait timer can be canceled or rescheduled before its expiration. When it expires, it sends particular
-        * PCEPErrorMessage and closes PCEP session.
-        */
-       private synchronized void handleOpenWait() {
-               if (this.state != State.IDLE) {
-                       this.terminate(PCEPErrors.NO_OPEN_BEFORE_EXP_OPENWAIT); // 1, 1
-               }
-       }
-
-       /**
-        * KeepWait timer can be canceled or rescheduled before its expiration. When it expires, it sends particular
-        * PCEPErrorMessage and closes PCEP session.
-        */
-       private synchronized void handleKeepWait() {
-               if (this.state != State.IDLE) {
-                       this.terminate(PCEPErrors.NO_MSG_BEFORE_EXP_KEEPWAIT); // 1, 7
-               }
-       }
-
-       /**
-        * If DeadTimer expires, the session ends. If a message (whichever) was received during this period, the DeadTimer
-        * will be rescheduled by DEAD_TIMER_VALUE + the time that has passed from the start of the DeadTimer to the time at
-        * which the message was received. If the session was closed by the time this method starts to execute (the session
-        * state will become IDLE), that rescheduling won't occur.
-        */
-       private synchronized void handleDeadtimer() {
-               final long ct = System.nanoTime();
-
-               final long nextDead = (long) (this.lastMessageReceivedAt + (this.DEAD_TIMER_VALUE * 1E9));
-
-               if (this.state != State.IDLE) {
-                       if (ct >= nextDead) {
-                               logger.debug("DeadTimer expired. " + new Date());
-                               this.terminate(Reason.EXP_DEADTIMER);
-                               return;
-                       }
-
-                       this.stateTimer.schedule(new DeadTimer(this), (long) ((nextDead - ct) / 1E6));
-               }
-       }
-
-       /**
-        * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
-        * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
-        * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
-        * starts to execute (the session state will become IDLE), that rescheduling won't occur.
-        */
-       private synchronized void handleKeepaliveTimer() {
-               final long ct = System.nanoTime();
-
-               long nextKeepalive = (long) (this.lastMessageSentAt + (this.KEEP_ALIVE_TIMER_VALUE * 1E9));
-
-               if (this.state != State.IDLE) {
-                       if (ct >= nextKeepalive) {
-                               this.sendMessage(new PCEPKeepAliveMessage());
-                               nextKeepalive = (long) (this.lastMessageSentAt + (this.KEEP_ALIVE_TIMER_VALUE * 1E9));
-                       }
-
-                       this.stateTimer.schedule(new KeepAliveTimer(this), (long) ((nextKeepalive - ct) / 1E6));
-               }
-       }
-
-       private void changeState(final State finalState) {
-               switch (finalState) {
-               case IDLE:
-                       logger.debug("Changed to state: " + State.IDLE);
-                       this.state = State.IDLE;
-                       return;
-               case OPEN_WAIT:
-                       logger.debug("Changed to state: " + State.OPEN_WAIT);
-                       if (this.state == State.UP) {
-                               throw new IllegalArgumentException("Cannot change state from " + this.state + " to " + State.OPEN_WAIT);
-                       }
-                       this.state = State.OPEN_WAIT;
-                       return;
-               case KEEP_WAIT:
-                       logger.debug("Changed to state: " + State.KEEP_WAIT);
-                       if (this.state == State.UP || this.state == State.IDLE) {
-                               throw new IllegalArgumentException("Cannot change state from " + this.state + " to " + State.KEEP_WAIT);
-                       }
-                       this.state = State.KEEP_WAIT;
-                       return;
-               case UP:
-                       logger.debug("Changed to state: " + State.UP);
-                       if (this.state == State.IDLE || this.state == State.UP) {
-                               throw new IllegalArgumentException("Cannot change state from " + this.state + " to " + State.UP);
-                       }
-                       this.state = State.UP;
-                       return;
-               }
-       }
-
-       private void restartOpenWait() {
-               if (this.state == State.OPEN_WAIT && this.openWaitTimer != null) {
-                       this.openWaitTimer.cancel();
-               }
-               this.openWaitTimer = new OpenWaitTimer(this);
-               this.stateTimer.schedule(this.openWaitTimer, OPEN_WAIT_TIMER_VALUE * 1000);
-       }
-
-       private void restartKeepWaitTimer() {
-               if (this.state == State.KEEP_WAIT && this.keepWaitTimer != null) {
-                       this.keepWaitTimer.cancel();
-               }
-               this.keepWaitTimer = new KeepWaitTimer(this);
-               this.stateTimer.schedule(this.keepWaitTimer, KEEP_WAIT_TIMER_VALUE * 1000);
-       }
-
-       /**
-        * Makes a callback to check if the session characteristics that FSM received, are acceptable.
-        *
-        * @param keepAliveTimerValue
-        * @param deadTimerValue
-        * @param tlvs
-        * @return
-        */
-       private boolean checkSessionCharacteristics(final PCEPOpenObject openObj) {
-               return this.checker.checkSessionCharacteristics(new PCEPSessionPreferences(openObj));
-       }
-
-       private PCEPOpenObject getNewProposal() {
-               return this.checker.getNewProposal(new PCEPSessionPreferences(this.localOpen)).getOpenObject();
-       }
-
-       /**
-        * Sends message to serialization.
-        *
-        * @param msg to be sent
-        */
-       @Override
-       public void sendMessage(final PCEPMessage msg) {
-               this.outputStream.putMessage(msg, this.factory);
-               this.lastMessageSentAt = System.nanoTime();
-               if (!(msg instanceof PCEPKeepAliveMessage))
-                       logger.debug("Sent message: " + msg);
-               this.parent.checkOutputBuffer(this);
-               this.sentMsgCount++;
-       }
-
-       @Override
-       public ProtocolOutputStream getStream() {
-               return this.outputStream;
-       }
-
-       private void commonClose() {
-               this.changeState(State.IDLE);
-               this.parent.onSessionClosed(this);
-       }
-
-       /**
-        * Closes PCEP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be
-        * modified, because he initiated the closing. (To prevent concurrent modification exception).
-        *
-        * @param closeObject
-        */
-       void closeWithoutMessage() {
-               logger.debug("Closing session: {}", this);
-               commonClose();
-       }
-
-       /**
-        * Closes PCEP session, cancels all timers, returns to state Idle WITHOUT sending the Close Message. KeepAlive and
-        * DeadTimer are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP
-        * session from inside the session or from the listener, therefore the parent of this session should be informed.
-        * The only closing reason is UNKNOWN.
-        */
-       @Override
-       public synchronized void close() {
-               logger.debug("Closing session: {}", this);
-               this.sendMessage(new PCEPCloseMessage(new PCEPCloseObject(Reason.UNKNOWN)));
-               commonClose();
-       }
-
-       private void terminate(final PCEPCloseObject.Reason reason) {
-               this.sendMessage(new PCEPCloseMessage(new PCEPCloseObject(reason)));
-               this.closeWithoutMessage();
-               this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
-       }
-
-       private void terminate(final PCEPErrors error) {
-               this.sendErrorMessage(error);
-               this.closeWithoutMessage();
-               this.listener.onSessionTerminated(this, new PCEPErrorTermination(error));
-       }
-
-       @Override
-       public void endOfInput() {
-               if (this.state != State.IDLE) {
-                       this.listener.onSessionDown(this, null, new IOException("End of input detected. Close the session."));
-               }
-       }
-
-       @Override
-       public int maximumMessageSize() {
-               return 65535;
-       }
-
-       private void sendErrorMessage(final PCEPErrors value) {
-               this.sendErrorMessage(value, null);
-       }
-
-       /**
-        * Sends PCEP Error Message with one PCEPError and Open Object.
-        *
-        * @param value
-        * @param open
-        */
-       private void sendErrorMessage(final PCEPErrors value, final PCEPOpenObject open) {
-               final PCEPErrorObject error = new PCEPErrorObject(value);
-               final List<PCEPErrorObject> errors = new ArrayList<PCEPErrorObject>();
-               errors.add(error);
-               this.sendMessage(new PCEPErrorMessage(open, errors, null));
-       }
-
-       @Override
-       public void handleMalformedMessage(final DeserializerException e) {
-               // FIXME rewrite
-
-       }
-
-       @Override
-       public void handleMalformedMessage(final DocumentedException e) {
-               // FIXME rewrite
-
-       }
-
-       /**
-        * The fact, that a message is malformed, comes from parser. In case of unrecognized message a particular error is
-        * sent (CAPABILITY_NOT_SUPPORTED) and the method checks if the MAX_UNKNOWN_MSG per minute wasn't overstepped.
-        * Second, any other error occurred that is specified by rfc. In this case, the an error message is generated and
-        * sent.
-        *
-        * @param error documented error in RFC5440 or draft
-        */
-       public void handleMalformedMessage(final PCEPErrors error) {
-               final long ct = System.nanoTime();
-               this.sendErrorMessage(error);
-               if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
-                       this.unknownMessagesTimes.add(ct);
-                       while (ct - this.unknownMessagesTimes.peek() > 60 * 1E9) {
-                               this.unknownMessagesTimes.poll();
-                       }
-                       if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
-                               this.terminate(Reason.TOO_MANY_UNKNOWN_MSG);
-                       }
-               }
-       }
-
-       /**
-        * In case of syntactic error or some parsing error, the session needs to be closed with the Reason: malformed
-        * message. The user needs to be notified about this error.
-        *
-        * @param e exception that was thrown from parser
-        */
-       public void handleMalformedMessage(final Exception e) {
-               logger.warn("PCEP byte stream corruption detected", e);
-               this.terminate(Reason.MALFORMED_MSG);
-       }
-
-       /**
-        * Open message should be handled only if the FSM is in OPEN_WAIT state.
-        *
-        * @param msg
-        */
-       private void handleOpenMessage(final PCEPOpenMessage msg) {
-               this.remoteOpen = msg.getOpenObject();
-               logger.debug("Received message: " + msg.toString());
-               if (this.state != State.OPEN_WAIT) {
-                       this.sendErrorMessage(PCEPErrors.ATTEMPT_2ND_SESSION);
-                       return;
-               }
-               final Boolean result = this.checkSessionCharacteristics(this.remoteOpen);
-               if (result == null) {
-                       this.terminate(PCEPErrors.NON_ACC_NON_NEG_SESSION_CHAR); // 1, 3
-                       return;
-               } else if (result) {
-                       this.DEAD_TIMER_VALUE = this.remoteOpen.getDeadTimerValue();
-                       this.KEEP_ALIVE_TIMER_VALUE = this.remoteOpen.getKeepAliveTimerValue();
-                       logger.debug("Session chars are acceptable. Overwriting: deadtimer: " + this.DEAD_TIMER_VALUE + "keepalive: "
-                                       + this.KEEP_ALIVE_TIMER_VALUE);
-                       this.remoteOK = true;
-                       this.openWaitTimer.cancel();
-                       this.sendMessage(new PCEPKeepAliveMessage());
-                       // if the timer is not disabled
-                       if (this.KEEP_ALIVE_TIMER_VALUE != 0) {
-                               this.stateTimer.schedule(new KeepAliveTimer(this), this.KEEP_ALIVE_TIMER_VALUE * 1000);
-                       }
-                       if (this.localOK) {
-                               // if the timer is not disabled
-                               if (this.DEAD_TIMER_VALUE != 0) {
-                                       this.stateTimer.schedule(new DeadTimer(this), this.DEAD_TIMER_VALUE * 1000);
-                               }
-                               this.changeState(State.UP);
-                               this.listener.onSessionUp(this, this.localOpen, this.remoteOpen);
-                       } else {
-                               this.restartKeepWaitTimer();
-                               this.changeState(State.KEEP_WAIT);
-                       }
-                       return;
-               } else if (!result) {
-                       this.localOpen = this.getNewProposal();
-                       if (this.openRetry) {
-                               this.terminate(PCEPErrors.SECOND_OPEN_MSG); // 1, 5
-                       } else {
-                               this.openRetry = true;
-                               this.sendErrorMessage(PCEPErrors.NON_ACC_NEG_SESSION_CHAR, this.localOpen); // 1, 4
-                               if (this.localOK) {
-                                       this.restartOpenWait();
-                                       this.changeState(State.OPEN_WAIT);
-                               } else {
-                                       this.restartKeepWaitTimer();
-                                       this.changeState(State.KEEP_WAIT);
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Error message should be handled in FSM if its state is KEEP_WAIT, otherwise it is just passed to session listener
-        * for handling.
-        *
-        * @param msg
-        */
-       private void handleErrorMessage(final PCEPErrorMessage msg) {
-               this.remoteOpen = msg.getOpenObject();
-               final Boolean result = this.checkSessionCharacteristics(this.remoteOpen);
-               if (result == null || !result) {
-                       this.terminate(PCEPErrors.PCERR_NON_ACC_SESSION_CHAR); // 1, 6
-                       return;
-               } else {
-                       this.KEEP_ALIVE_TIMER_VALUE = this.remoteOpen.getKeepAliveTimerValue();
-                       this.DEAD_TIMER_VALUE = this.remoteOpen.getDeadTimerValue();
-                       logger.debug("New values for keepalive: " + this.remoteOpen.getKeepAliveTimerValue() + " deadtimer "
-                                       + this.remoteOpen.getDeadTimerValue());
-                       this.sendMessage(new PCEPOpenMessage(this.remoteOpen));
-                       if (this.remoteOK) {
-                               this.restartKeepWaitTimer();
-                               this.changeState(State.KEEP_WAIT);
-                       } else {
-                               this.keepWaitTimer.cancel();
-                               this.restartOpenWait();
-                               this.changeState(State.OPEN_WAIT);
-                       }
-               }
-       }
-
-       /**
-        * KeepAlive message should be explicitly parsed in FSM when its state is KEEP_WAIT. Otherwise is handled by the
-        * KeepAliveTimer or it's invalid.
-        */
-       private void handleKeepAliveMessage() {
-               if (this.state == State.KEEP_WAIT) {
-                       this.localOK = true;
-                       this.keepWaitTimer.cancel();
-                       if (this.remoteOK) {
-                               if (this.DEAD_TIMER_VALUE != 0) {
-                                       this.stateTimer.schedule(new DeadTimer(this), this.DEAD_TIMER_VALUE * 1000);
-                               }
-                               this.changeState(State.UP);
-                               this.listener.onSessionUp(this, this.localOpen, this.remoteOpen);
-                       } else {
-                               this.restartOpenWait();
-                               this.changeState(State.OPEN_WAIT);
-                       }
-               }
-       }
-
-       /**
-        * Handles incoming message. If the session is up, it notifies the user. The user is notified about every message
-        * except KeepAlive.
-        *
-        * @param msg incoming message
-        */
-       @Override
-       public void handleMessage(final ProtocolMessage msg) {
-               // Update last reception time
-               final PCEPMessage pcepMsg = (PCEPMessage) msg;
-
-               this.lastMessageReceivedAt = System.nanoTime();
-               this.receivedMsgCount++;
-
-               if (pcepMsg instanceof PCEPRawMessage) {
-                       List<PCEPMessage> msgs;
-                       try {
-                               msgs = PCEPMessageValidator.getValidator(((PCEPRawMessage) pcepMsg).getMsgType()).validate(
-                                               ((PCEPRawMessage) pcepMsg).getAllObjects());
-                               for (final PCEPMessage tmpMsg : msgs) {
-                                       this.handleMessage(tmpMsg);
-                               }
-                       } catch (final PCEPDeserializerException e) {
-                               logger.error("Malformed message, terminating. ", e);
-                               this.terminate(Reason.MALFORMED_MSG);
-                       }
-                       return;
-               }
-
-               // Keepalives are handled internally
-               if (pcepMsg instanceof PCEPKeepAliveMessage) {
-                       this.handleKeepAliveMessage();
-                       return;
-               }
-
-               // Open messages are handled internally
-               if (pcepMsg instanceof PCEPOpenMessage) {
-                       this.handleOpenMessage((PCEPOpenMessage) pcepMsg);
-                       return;
-               }
-
-               /*
-                * During initial handshake we handle all the messages.
-                */
-               if (this.state != State.UP) {
-                       /*
-                        * In KEEP_WAIT, an Error message is a valid thing to see, because
-                        * it is used in negotiation.
-                        */
-                       if (pcepMsg instanceof PCEPErrorMessage && this.state == State.KEEP_WAIT
-                                       && ((PCEPErrorMessage) pcepMsg).getOpenObject() != null) {
-                               this.handleErrorMessage((PCEPErrorMessage) pcepMsg);
-                               return;
-                       }
-
-                       /*
-                        * OPEN and KEEPALIVE messages are handled at the top. ERROR
-                        * messages are handled in the specific case of KEEP_WAIT above, so
-                        * anything else is invalid here.
-                        */
-                       this.terminate(PCEPErrors.NON_OR_INVALID_OPEN_MSG);
-                       return;
-               }
-
-               /*
-                * Session is up, we are reporting all messages to user. One notable
-                * exception is CLOSE message, which needs to be converted into a
-                * session DOWN event.
-                */
-               if (pcepMsg instanceof PCEPCloseMessage) {
-                       this.listener.onSessionTerminated(this, new PCEPCloseTermination(((PCEPCloseMessage) pcepMsg).getCloseObject().getReason()));
-                       this.closeWithoutMessage();
-                       return;
-               }
-               this.listener.onMessage(this, pcepMsg);
-       }
-
-       @Override
-       public ProtocolMessageFactory getMessageFactory() {
-               return this.factory;
-       }
-
-       @Override
-       public void onConnectionFailed(final IOException e) {
-               logger.info("Connection failed before finishing: {}", e.getMessage(), e);
-               this.listener.onSessionDown(this, new PCEPCloseObject(Reason.UNKNOWN), e);
-       }
-
-       /**
-        * @return the sentMsgCount
-        */
-
-       @Override
-       public Integer getSentMsgCount() {
-               return this.sentMsgCount;
-       }
-
-       /**
-        * @return the receivedMsgCount
-        */
-
-       @Override
-       public Integer getReceivedMsgCount() {
-               return this.receivedMsgCount;
-       }
-
-
-       @Override
-       public Integer getDeadTimerValue() {
-               return this.DEAD_TIMER_VALUE;
-       }
-
-
-       @Override
-       public Integer getKeepAliveTimerValue() {
-               return this.KEEP_ALIVE_TIMER_VALUE;
-       }
-
-
-       @Override
-       public String getPeerAddress() {
-               return this.peerAddress;
-       }
-
-       @Override
-       public void tearDown() throws IOException {
-               this.close();
-       }
-
-
-       @Override
-       public String getNodeIdentifier() {
-               if (!this.remoteOpen.getTlvs().isEmpty()) {
-                       final PCEPTlv tlv = this.remoteOpen.getTlvs().iterator().next();
-                       if (tlv != null && tlv instanceof NodeIdentifierTlv) {
-                               return tlv.toString();
-                       }
-               }
-               return "";
-       }
-
-       @Override
-       public String toString() {
-               final StringBuilder builder = new StringBuilder();
-               builder.append("PCEPSessionImpl [state=");
-               builder.append(this.state);
-               builder.append(", localOK=");
-               builder.append(this.localOK);
-               builder.append(", remoteOK=");
-               builder.append(this.remoteOK);
-               builder.append(", openRetry=");
-               builder.append(this.openRetry);
-               builder.append(", sessionId=");
-               builder.append(this.sessionId);
-               builder.append(", checker=");
-               builder.append(this.checker);
-               builder.append(", localOpen=");
-               builder.append(this.localOpen);
-               builder.append(", remoteOpen=");
-               builder.append(this.remoteOpen);
-               builder.append(", outputStream=");
-               builder.append(this.outputStream);
-               builder.append("]");
-               return builder.toString();
-       }
+@VisibleForTesting
+public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implements PCEPSession {
+    private static final long MINUTE = TimeUnit.MINUTES.toNanos(1);
+    private static Ticker TICKER = Ticker.systemTicker();
+    /**
+     * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
+     */
+    private volatile long lastMessageSentAt;
+
+    /**
+     * System.nanoTime value about when was received the last message
+     */
+    private long lastMessageReceivedAt;
+
+    private final Queue<Long> unknownMessagesTimes = new LinkedList<>();
+
+    private final PCEPSessionListener listener;
+
+    /**
+     * Open Object with session characteristics that were accepted by another PCE (sent from this session).
+     */
+    private final Open localOpen;
+
+    /**
+     * Open Object with session characteristics for this session (sent from another PCE).
+     */
+    private final Open remoteOpen;
+
+    private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class);
+
+    private int maxUnknownMessages;
+
+    // True if the listener should not be notified about events
+    @GuardedBy("this")
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private final Channel channel;
+
+    private final Keepalive kaMessage = new KeepaliveBuilder().setKeepaliveMessage(new KeepaliveMessageBuilder().build()).build();
+
+    private final PCEPSessionState sessionState;
+
+    PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
+            final Open localOpen, final Open remoteOpen) {
+        this.listener = requireNonNull(listener);
+        this.channel = requireNonNull(channel);
+        this.localOpen = requireNonNull(localOpen);
+        this.remoteOpen = requireNonNull(remoteOpen);
+        this.lastMessageReceivedAt = TICKER.read();
+
+        if (maxUnknownMessages != 0) {
+            this.maxUnknownMessages = maxUnknownMessages;
+        }
+
+
+        if (getDeadTimerValue() != 0) {
+            channel.eventLoop().schedule(this::handleDeadTimer, getDeadTimerValue(), TimeUnit.SECONDS);
+        }
+
+        if (getKeepAliveTimerValue() != 0) {
+            channel.eventLoop().schedule(this::handleKeepaliveTimer, getKeepAliveTimerValue(), TimeUnit.SECONDS);
+        }
+
+        LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(),
+                remoteOpen.getSessionId());
+        this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
+    }
+
+    public final Integer getKeepAliveTimerValue() {
+        return this.localOpen.getKeepalive().intValue();
+    }
+
+    public final Integer getDeadTimerValue() {
+        return this.remoteOpen.getDeadTimer().intValue();
+    }
+
+    /**
+     * If DeadTimer expires, the session ends. If a message (whichever) was received during this period, the DeadTimer
+     * will be rescheduled by DEAD_TIMER_VALUE + the time that has passed from the start of the DeadTimer to the time at
+     * which the message was received. If the session was closed by the time this method starts to execute (the session
+     * state will become IDLE), that rescheduling won't occur.
+     */
+    private synchronized void handleDeadTimer() {
+        final long ct = TICKER.read();
+
+        final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
+
+        if (this.channel.isActive()) {
+            if (ct >= nextDead) {
+                LOG.debug("DeadTimer expired. {}", new Date());
+                this.terminate(TerminationReason.EXP_DEADTIMER);
+            } else {
+                this.channel.eventLoop().schedule(this::handleDeadTimer, nextDead - ct, TimeUnit.NANOSECONDS);
+            }
+        }
+    }
+
+    /**
+     * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
+     * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
+     * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
+     * starts to execute (the session state will become IDLE), that rescheduling won't occur.
+     */
+    private void handleKeepaliveTimer() {
+        final long ct = TICKER.read();
+
+        long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
+
+        if (this.channel.isActive()) {
+            if (ct >= nextKeepalive) {
+                this.sendMessage(this.kaMessage);
+                nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
+            }
+
+            this.channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS);
+        }
+    }
+
+    /**
+     * Handle exception occurred in the PCEP session. The session in error state should be closed
+     * properly so that it can be restored later.
+     */
+    @VisibleForTesting
+    void handleException(final Throwable cause) {
+        LOG.error("Exception captured for session {}, closing session.", this, cause);
+        terminate(TerminationReason.UNKNOWN);
+    }
+
+    /**
+     * Sends message to serialization.
+     *
+     * @param msg to be sent
+     */
+    @Override
+    public Future<Void> sendMessage(final Message msg) {
+        final ChannelFuture f = this.channel.writeAndFlush(msg);
+        this.lastMessageSentAt = TICKER.read();
+        this.sessionState.updateLastSentMsg();
+        if (!(msg instanceof KeepaliveMessage)) {
+            LOG.debug("PCEP Message enqueued: {}", msg);
+        }
+        if (msg instanceof PcerrMessage) {
+            this.sessionState.setLastSentError(msg);
+        }
+
+        f.addListener((ChannelFutureListener) arg -> {
+            if (arg.isSuccess()) {
+                LOG.trace("Message sent to socket: {}", msg);
+            } else {
+                LOG.debug("Message not sent: {}", msg, arg.cause());
+            }
+        });
+
+        return f;
+    }
+
+    @VisibleForTesting
+    ChannelFuture closeChannel() {
+        LOG.info("Closing PCEP session channel: {}", this.channel);
+        return this.channel.close();
+    }
+
+    @VisibleForTesting
+    public synchronized boolean isClosed() {
+        return this.closed.get();
+    }
+
+    /**
+     * Closes PCEP session without sending a Close message, as the channel is no longer active.
+     */
+    @Override
+    public synchronized void close() {
+        close(null);
+    }
+
+    /**
+     * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
+     * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
+     * inside the session or from the listener, therefore the parent of this session should be informed.
+     */
+    @Override
+    public void close(final TerminationReason reason) {
+        if (this.closed.getAndSet(true)) {
+            LOG.debug("Session is already closed.");
+            return;
+        }
+        // only send close message when the reason is provided
+        if (reason != null) {
+            LOG.info("Closing PCEP session with reason {}: {}", reason, this);
+            sendMessage(new CloseBuilder().setCCloseMessage(
+                    new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
+        } else {
+            LOG.info("Closing PCEP session: {}", this);
+        }
+        closeChannel();
+    }
+
+    @Override
+    public Tlvs getRemoteTlvs() {
+        return this.remoteOpen.getTlvs();
+    }
+
+    @Override
+    public InetAddress getRemoteAddress() {
+        return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
+    }
+
+    private synchronized void terminate(final TerminationReason reason) {
+        if (this.closed.get()) {
+            LOG.debug("Session {} is already closed.", this);
+            return;
+        }
+        close(reason);
+        this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
+    }
+
+    synchronized void endOfInput() {
+        if (!this.closed.getAndSet(true)) {
+            this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
+        }
+    }
+
+    private void sendErrorMessage(final PCEPErrors value) {
+        this.sendErrorMessage(value, null);
+    }
+
+    /**
+     * Sends PCEP Error Message with one PCEPError and Open Object.
+     *
+     * @param value
+     * @param open
+     */
+    private void sendErrorMessage(final PCEPErrors value, final Open open) {
+        this.sendMessage(Util.createErrorMessage(value, open));
+    }
+
+    /**
+     * The fact, that a message is malformed, comes from parser. In case of unrecognized message a particular error is
+     * sent (CAPABILITY_NOT_SUPPORTED) and the method checks if the MAX_UNKNOWN_MSG per minute wasn't overstepped.
+     * Second, any other error occurred that is specified by rfc. In this case, the an error message is generated and
+     * sent.
+     *
+     * @param error documented error in RFC5440 or draft
+     */
+    @VisibleForTesting
+    void handleMalformedMessage(final PCEPErrors error) {
+        final long ct = TICKER.read();
+        this.sendErrorMessage(error);
+        if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
+            this.unknownMessagesTimes.add(ct);
+            while (ct - this.unknownMessagesTimes.peek() > MINUTE) {
+                final Long poll = this.unknownMessagesTimes.poll();
+            }
+            if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
+                this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
+            }
+        }
+    }
+
+    /**
+     * Handles incoming message. If the session is up, it notifies the user. The user is notified about every message
+     * except KeepAlive.
+     *
+     * @param msg incoming message
+     */
+    public synchronized void handleMessage(final Message msg) {
+        if (this.closed.get()) {
+            LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg);
+            return;
+        }
+        // Update last reception time
+        this.lastMessageReceivedAt = TICKER.read();
+        this.sessionState.updateLastReceivedMsg();
+        if (!(msg instanceof KeepaliveMessage)) {
+            LOG.debug("PCEP message {} received.", msg);
+        }
+        // Internal message handling. The user does not see these messages
+        if (msg instanceof KeepaliveMessage) {
+            // Do nothing, the timer has been already reset
+        } else if (msg instanceof OpenMessage) {
+            this.sendErrorMessage(PCEPErrors.ATTEMPT_2ND_SESSION);
+        } else if (msg instanceof CloseMessage) {
+            /*
+             * Session is up, we are reporting all messages to user. One notable
+             * exception is CLOSE message, which needs to be converted into a
+             * session DOWN event.
+             */
+            close();
+            this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason
+                    .forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason())));
+        } else {
+            // This message needs to be handled by the user
+            if (msg instanceof PcerrMessage) {
+                this.sessionState.setLastReceivedError(msg);
+            }
+            this.listener.onMessage(this, msg);
+        }
+    }
+
+    @Override
+    public final String toString() {
+        return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
+    }
+
+    private ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
+        toStringHelper.add("channel", this.channel);
+        toStringHelper.add("localOpen", this.localOpen);
+        toStringHelper.add("remoteOpen", this.remoteOpen);
+        return toStringHelper;
+    }
+
+    @VisibleForTesting
+    void sessionUp() {
+        try {
+            this.listener.onSessionUp(this);
+        } catch (final Exception e) {
+            handleException(e);
+            throw e;
+        }
+    }
+
+    @VisibleForTesting
+    final Queue<Long> getUnknownMessagesTimes() {
+        return this.unknownMessagesTimes;
+    }
+
+    @Override
+    public Messages getMessages() {
+        return this.sessionState.getMessages(this.unknownMessagesTimes.size());
+    }
+
+    @Override
+    public LocalPref getLocalPref() {
+        return this.sessionState.getLocalPref();
+    }
+
+    @Override
+    public PeerPref getPeerPref() {
+        return this.sessionState.getPeerPref();
+    }
+
+    @Override
+    public Open getLocalOpen() {
+        return this.sessionState.getLocalOpen();
+    }
+
+    @Override
+    public synchronized final void channelInactive(final ChannelHandlerContext ctx) {
+        LOG.debug("Channel {} inactive.", ctx.channel());
+        endOfInput();
+
+        try {
+            super.channelInactive(ctx);
+        } catch (final Exception e) {
+            throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
+        }
+    }
+
+    @Override
+    protected synchronized final void channelRead0(final ChannelHandlerContext ctx, final Message msg) {
+        LOG.debug("Message was received: {}", msg);
+        handleMessage(msg);
+    }
+
+    @Override
+    public synchronized final void handlerAdded(final ChannelHandlerContext ctx) {
+        this.sessionUp();
+    }
+
+    @Override
+    public  synchronized void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
+        handleException(cause);
+    }
+
+    @Override
+    public Tlvs getLocalTlvs() {
+        return this.localOpen.getTlvs();
+    }
+
+    @VisibleForTesting
+    static void setTicker(final Ticker ticker) {
+        TICKER = ticker;
+    }
 }