Switched BGPMessage concept to yangtools.binding.Notification.
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / BGPSessionImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import io.netty.channel.Channel;
11 import io.netty.util.Timeout;
12 import io.netty.util.Timer;
13 import io.netty.util.TimerTask;
14
15 import java.io.IOException;
16 import java.util.Date;
17 import java.util.Set;
18 import java.util.concurrent.TimeUnit;
19
20 import javax.annotation.concurrent.GuardedBy;
21
22 import org.opendaylight.protocol.bgp.concepts.BGPTableType;
23 import org.opendaylight.protocol.bgp.parser.BGPError;
24 import org.opendaylight.protocol.bgp.parser.BGPParameter;
25 import org.opendaylight.protocol.bgp.parser.BGPSession;
26 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
27 import org.opendaylight.protocol.bgp.parser.BGPTerminationReason;
28 import org.opendaylight.protocol.bgp.parser.message.BGPKeepAliveMessage;
29 import org.opendaylight.protocol.bgp.parser.message.BGPNotificationMessage;
30 import org.opendaylight.protocol.bgp.parser.message.BGPOpenMessage;
31 import org.opendaylight.protocol.bgp.parser.parameter.MultiprotocolCapability;
32 import org.opendaylight.protocol.framework.AbstractProtocolSession;
33 import org.opendaylight.yangtools.yang.binding.Notification;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.common.annotations.VisibleForTesting;
38 import com.google.common.base.Objects;
39 import com.google.common.base.Objects.ToStringHelper;
40 import com.google.common.base.Preconditions;
41 import com.google.common.collect.Sets;
42
43 @VisibleForTesting
44 public class BGPSessionImpl extends AbstractProtocolSession<Notification> implements BGPSession {
45
46         private static final Logger logger = LoggerFactory.getLogger(BGPSessionImpl.class);
47
48         private static final int DEFAULT_HOLD_TIMER_VALUE = 15;
49
50         public static int HOLD_TIMER_VALUE = DEFAULT_HOLD_TIMER_VALUE; // 240
51
52         /**
53          * Internal session state.
54          */
55         public enum State {
56                 /**
57                  * The session object is created by the negotiator in OpenConfirm state. While in this state, the session object
58                  * is half-alive, e.g. the timers are running, but the session is not completely up, e.g. it has not been
59                  * announced to the listener. If the session is torn down in this state, we do not inform the listener.
60                  */
61                 OpenConfirm,
62                 /**
63                  * The session has been completely established.
64                  */
65                 Up,
66                 /**
67                  * The session has been closed. It will not be resurrected.
68                  */
69                 Idle,
70         }
71
72         /**
73          * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
74          */
75         protected long lastMessageSentAt;
76
77         /**
78          * System.nanoTime value about when was received the last message
79          */
80         private long lastMessageReceivedAt;
81
82         private final BGPSessionListener listener;
83
84         /**
85          * Timer object grouping FSM Timers
86          */
87         private final Timer stateTimer;
88
89         private final BGPSynchronization sync;
90
91         private int kaCounter = 0;
92
93         private final Channel channel;
94
95         @GuardedBy("this")
96         private State state = State.OpenConfirm;
97
98         private final int keepAlive;
99
100         private final Set<BGPTableType> tableTypes;
101
102         BGPSessionImpl(final Timer timer, final BGPSessionListener listener, final Channel channel, final BGPOpenMessage remoteOpen) {
103                 this.listener = Preconditions.checkNotNull(listener);
104                 this.stateTimer = Preconditions.checkNotNull(timer);
105                 this.channel = Preconditions.checkNotNull(channel);
106                 this.keepAlive = remoteOpen.getHoldTime() / 3;
107
108                 final Set<BGPTableType> tts = Sets.newHashSet();
109                 if (remoteOpen.getOptParams() != null) {
110                         for (final BGPParameter param : remoteOpen.getOptParams()) {
111                                 if (param instanceof MultiprotocolCapability) {
112                                         tts.add(((MultiprotocolCapability) param).getTableType());
113                                 }
114                         }
115                 }
116
117                 this.sync = new BGPSynchronization(this, this.listener, tts);
118                 this.tableTypes = tts;
119
120                 if (remoteOpen.getHoldTime() != 0) {
121                         this.stateTimer.newTimeout(new TimerTask() {
122
123                                 @Override
124                                 public void run(final Timeout timeout) throws Exception {
125                                         handleHoldTimer();
126                                 }
127                         }, remoteOpen.getHoldTime(), TimeUnit.SECONDS);
128
129                         this.stateTimer.newTimeout(new TimerTask() {
130                                 @Override
131                                 public void run(final Timeout timeout) throws Exception {
132                                         handleKeepaliveTimer();
133                                 }
134                         }, this.keepAlive, TimeUnit.SECONDS);
135                 }
136         }
137
138         @Override
139         public synchronized void close() {
140                 logger.debug("Closing session: {}", this);
141                 if (this.state != State.Idle) {
142                         this.sendMessage(new BGPNotificationMessage(BGPError.CEASE));
143                         this.channel.close();
144                         this.state = State.Idle;
145                 }
146         }
147
148         /**
149          * Handles incoming message based on their type.
150          * 
151          * @param msg incoming message
152          */
153         @Override
154         public void handleMessage(final Notification msg) {
155                 // Update last reception time
156                 this.lastMessageReceivedAt = System.nanoTime();
157
158                 if (msg instanceof BGPOpenMessage) {
159                         // Open messages should not be present here
160                         this.terminate(BGPError.FSM_ERROR);
161                 } else if (msg instanceof BGPNotificationMessage) {
162                         // Notifications are handled internally
163                         logger.info("Session closed because Notification message received: {}", ((BGPNotificationMessage) msg).getError());
164                         this.closeWithoutMessage();
165                         this.listener.onSessionTerminated(this, new BGPTerminationReason(((BGPNotificationMessage) msg).getError()));
166                 } else if (msg instanceof BGPKeepAliveMessage) {
167                         // Keepalives are handled internally
168                         logger.debug("Received KeepAlive messsage.");
169                         this.kaCounter++;
170                         if (this.kaCounter >= 2) {
171                                 this.sync.kaReceived();
172                         }
173                 } else {
174                         // All others are passed up
175                         this.listener.onMessage(this, msg);
176                 }
177         }
178
179         @Override
180         public synchronized void endOfInput() {
181                 if (this.state == State.Up) {
182                         this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
183                 }
184         }
185
186         void sendMessage(final Notification msg) {
187                 try {
188                         this.channel.writeAndFlush(msg);
189                         this.lastMessageSentAt = System.nanoTime();
190                         logger.debug("Sent message: {}", msg);
191                 } catch (final Exception e) {
192                         logger.warn("Message {} was not sent.", msg, e);
193                 }
194         }
195
196         private synchronized void closeWithoutMessage() {
197                 logger.debug("Closing session: {}", this);
198                 this.channel.close();
199                 this.state = State.Idle;
200         }
201
202         /**
203          * Closes PCEP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be
204          * modified, because he initiated the closing. (To prevent concurrent modification exception).
205          * 
206          * @param closeObject
207          */
208         private void terminate(final BGPError error) {
209                 this.sendMessage(new BGPNotificationMessage(error));
210                 this.closeWithoutMessage();
211
212                 this.listener.onSessionTerminated(this, new BGPTerminationReason(error));
213         }
214
215         /**
216          * If HoldTimer expires, the session ends. If a message (whichever) was received during this period, the HoldTimer
217          * will be rescheduled by HOLD_TIMER_VALUE + the time that has passed from the start of the HoldTimer to the time at
218          * which the message was received. If the session was closed by the time this method starts to execute (the session
219          * state will become IDLE), then rescheduling won't occur.
220          */
221         private synchronized void handleHoldTimer() {
222                 if (this.state == State.Idle) {
223                         return;
224                 }
225
226                 final long ct = System.nanoTime();
227                 final long nextHold = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(HOLD_TIMER_VALUE);
228
229                 if (ct >= nextHold) {
230                         logger.debug("HoldTimer expired. " + new Date());
231                         this.terminate(BGPError.HOLD_TIMER_EXPIRED);
232                 } else {
233                         this.stateTimer.newTimeout(new TimerTask() {
234                                 @Override
235                                 public void run(final Timeout timeout) throws Exception {
236                                         handleHoldTimer();
237                                 }
238                         }, nextHold - ct, TimeUnit.NANOSECONDS);
239                 }
240         }
241
242         /**
243          * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
244          * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
245          * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
246          * starts to execute (the session state will become IDLE), that rescheduling won't occur.
247          */
248         private synchronized void handleKeepaliveTimer() {
249                 if (this.state == State.Idle) {
250                         return;
251                 }
252
253                 final long ct = System.nanoTime();
254                 long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive);
255
256                 if (ct >= nextKeepalive) {
257                         this.sendMessage(new BGPKeepAliveMessage());
258                         nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive);
259                 }
260                 this.stateTimer.newTimeout(new TimerTask() {
261                         @Override
262                         public void run(final Timeout timeout) throws Exception {
263                                 handleKeepaliveTimer();
264                         }
265                 }, nextKeepalive - ct, TimeUnit.NANOSECONDS);
266         }
267
268         @Override
269         final public String toString() {
270                 return addToStringAttributes(Objects.toStringHelper(this)).toString();
271         }
272
273         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
274                 toStringHelper.add("channel", this.channel);
275                 toStringHelper.add("state", this.state);
276                 return toStringHelper;
277         }
278
279         @Override
280         public Set<BGPTableType> getAdvertisedTableTypes() {
281                 return this.tableTypes;
282         }
283
284         @Override
285         protected synchronized void sessionUp() {
286                 this.state = State.Up;
287                 this.listener.onSessionUp(this);
288         }
289
290         public synchronized State getState() {
291                 return this.state;
292         }
293 }