35255c7bd9614c0264dbdc08e6bd3ef4639601be
[bgpcep.git] / bgp / rib-impl / src / main / java / org / opendaylight / protocol / bgp / rib / impl / BGPSessionImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.bgp.rib.impl;
9
10 import io.netty.channel.Channel;
11 import io.netty.util.Timeout;
12 import io.netty.util.Timer;
13 import io.netty.util.TimerTask;
14
15 import java.io.IOException;
16 import java.util.Date;
17 import java.util.Set;
18 import java.util.concurrent.TimeUnit;
19
20 import javax.annotation.concurrent.GuardedBy;
21
22 import org.opendaylight.protocol.bgp.concepts.BGPTableType;
23 import org.opendaylight.protocol.bgp.parser.BGPError;
24 import org.opendaylight.protocol.bgp.parser.BGPParameter;
25 import org.opendaylight.protocol.bgp.parser.BGPSession;
26 import org.opendaylight.protocol.bgp.parser.BGPSessionListener;
27 import org.opendaylight.protocol.bgp.parser.BGPTerminationReason;
28 import org.opendaylight.protocol.bgp.parser.message.BGPNotificationMessage;
29 import org.opendaylight.protocol.bgp.parser.message.BGPOpenMessage;
30 import org.opendaylight.protocol.bgp.parser.parameter.MultiprotocolCapability;
31 import org.opendaylight.protocol.framework.AbstractProtocolSession;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130918.Keepalive;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.bgp.message.rev130918.KeepaliveBuilder;
34 import org.opendaylight.yangtools.yang.binding.Notification;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import com.google.common.annotations.VisibleForTesting;
39 import com.google.common.base.Objects;
40 import com.google.common.base.Objects.ToStringHelper;
41 import com.google.common.base.Preconditions;
42 import com.google.common.collect.Sets;
43
44 @VisibleForTesting
45 public class BGPSessionImpl extends AbstractProtocolSession<Notification> implements BGPSession {
46
47         private static final Logger logger = LoggerFactory.getLogger(BGPSessionImpl.class);
48
49         private static final int DEFAULT_HOLD_TIMER_VALUE = 15;
50
51         private static final Notification keepalive = new KeepaliveBuilder().build();
52
53         public static int HOLD_TIMER_VALUE = DEFAULT_HOLD_TIMER_VALUE; // 240
54
55         /**
56          * Internal session state.
57          */
58         public enum State {
59                 /**
60                  * The session object is created by the negotiator in OpenConfirm state. While in this state, the session object
61                  * is half-alive, e.g. the timers are running, but the session is not completely up, e.g. it has not been
62                  * announced to the listener. If the session is torn down in this state, we do not inform the listener.
63                  */
64                 OpenConfirm,
65                 /**
66                  * The session has been completely established.
67                  */
68                 Up,
69                 /**
70                  * The session has been closed. It will not be resurrected.
71                  */
72                 Idle,
73         }
74
75         /**
76          * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
77          */
78         protected long lastMessageSentAt;
79
80         /**
81          * System.nanoTime value about when was received the last message
82          */
83         private long lastMessageReceivedAt;
84
85         private final BGPSessionListener listener;
86
87         /**
88          * Timer object grouping FSM Timers
89          */
90         private final Timer stateTimer;
91
92         private final BGPSynchronization sync;
93
94         private int kaCounter = 0;
95
96         private final Channel channel;
97
98         @GuardedBy("this")
99         private State state = State.OpenConfirm;
100
101         private final int keepAlive;
102
103         private final Set<BGPTableType> tableTypes;
104
105         BGPSessionImpl(final Timer timer, final BGPSessionListener listener, final Channel channel, final BGPOpenMessage remoteOpen) {
106                 this.listener = Preconditions.checkNotNull(listener);
107                 this.stateTimer = Preconditions.checkNotNull(timer);
108                 this.channel = Preconditions.checkNotNull(channel);
109                 this.keepAlive = remoteOpen.getHoldTime() / 3;
110
111                 final Set<BGPTableType> tts = Sets.newHashSet();
112                 if (remoteOpen.getOptParams() != null) {
113                         for (final BGPParameter param : remoteOpen.getOptParams()) {
114                                 if (param instanceof MultiprotocolCapability) {
115                                         tts.add(((MultiprotocolCapability) param).getTableType());
116                                 }
117                         }
118                 }
119
120                 this.sync = new BGPSynchronization(this, this.listener, tts);
121                 this.tableTypes = tts;
122
123                 if (remoteOpen.getHoldTime() != 0) {
124                         this.stateTimer.newTimeout(new TimerTask() {
125
126                                 @Override
127                                 public void run(final Timeout timeout) throws Exception {
128                                         handleHoldTimer();
129                                 }
130                         }, remoteOpen.getHoldTime(), TimeUnit.SECONDS);
131
132                         this.stateTimer.newTimeout(new TimerTask() {
133                                 @Override
134                                 public void run(final Timeout timeout) throws Exception {
135                                         handleKeepaliveTimer();
136                                 }
137                         }, this.keepAlive, TimeUnit.SECONDS);
138                 }
139         }
140
141         @Override
142         public synchronized void close() {
143                 logger.debug("Closing session: {}", this);
144                 if (this.state != State.Idle) {
145                         this.sendMessage(new BGPNotificationMessage(BGPError.CEASE));
146                         this.channel.close();
147                         this.state = State.Idle;
148                 }
149         }
150
151         /**
152          * Handles incoming message based on their type.
153          * 
154          * @param msg incoming message
155          */
156         @Override
157         public void handleMessage(final Notification msg) {
158                 // Update last reception time
159                 this.lastMessageReceivedAt = System.nanoTime();
160
161                 if (msg instanceof BGPOpenMessage) {
162                         // Open messages should not be present here
163                         this.terminate(BGPError.FSM_ERROR);
164                 } else if (msg instanceof BGPNotificationMessage) {
165                         // Notifications are handled internally
166                         logger.info("Session closed because Notification message received: {}", ((BGPNotificationMessage) msg).getError());
167                         this.closeWithoutMessage();
168                         this.listener.onSessionTerminated(this, new BGPTerminationReason(((BGPNotificationMessage) msg).getError()));
169                 } else if (msg instanceof Keepalive) {
170                         // Keepalives are handled internally
171                         logger.debug("Received KeepAlive messsage.");
172                         this.kaCounter++;
173                         if (this.kaCounter >= 2) {
174                                 this.sync.kaReceived();
175                         }
176                 } else {
177                         // All others are passed up
178                         this.listener.onMessage(this, msg);
179                 }
180         }
181
182         @Override
183         public synchronized void endOfInput() {
184                 if (this.state == State.Up) {
185                         this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
186                 }
187         }
188
189         void sendMessage(final Notification msg) {
190                 try {
191                         this.channel.writeAndFlush(msg);
192                         this.lastMessageSentAt = System.nanoTime();
193                         logger.debug("Sent message: {}", msg);
194                 } catch (final Exception e) {
195                         logger.warn("Message {} was not sent.", msg, e);
196                 }
197         }
198
199         private synchronized void closeWithoutMessage() {
200                 logger.debug("Closing session: {}", this);
201                 this.channel.close();
202                 this.state = State.Idle;
203         }
204
205         /**
206          * Closes PCEP session from the parent with given reason. A message needs to be sent, but parent doesn't have to be
207          * modified, because he initiated the closing. (To prevent concurrent modification exception).
208          * 
209          * @param closeObject
210          */
211         private void terminate(final BGPError error) {
212                 this.sendMessage(new BGPNotificationMessage(error));
213                 this.closeWithoutMessage();
214
215                 this.listener.onSessionTerminated(this, new BGPTerminationReason(error));
216         }
217
218         /**
219          * If HoldTimer expires, the session ends. If a message (whichever) was received during this period, the HoldTimer
220          * will be rescheduled by HOLD_TIMER_VALUE + the time that has passed from the start of the HoldTimer to the time at
221          * which the message was received. If the session was closed by the time this method starts to execute (the session
222          * state will become IDLE), then rescheduling won't occur.
223          */
224         private synchronized void handleHoldTimer() {
225                 if (this.state == State.Idle) {
226                         return;
227                 }
228
229                 final long ct = System.nanoTime();
230                 final long nextHold = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(HOLD_TIMER_VALUE);
231
232                 if (ct >= nextHold) {
233                         logger.debug("HoldTimer expired. " + new Date());
234                         this.terminate(BGPError.HOLD_TIMER_EXPIRED);
235                 } else {
236                         this.stateTimer.newTimeout(new TimerTask() {
237                                 @Override
238                                 public void run(final Timeout timeout) throws Exception {
239                                         handleHoldTimer();
240                                 }
241                         }, nextHold - ct, TimeUnit.NANOSECONDS);
242                 }
243         }
244
245         /**
246          * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
247          * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
248          * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
249          * starts to execute (the session state will become IDLE), that rescheduling won't occur.
250          */
251         private synchronized void handleKeepaliveTimer() {
252                 if (this.state == State.Idle) {
253                         return;
254                 }
255
256                 final long ct = System.nanoTime();
257                 long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive);
258
259                 if (ct >= nextKeepalive) {
260                         this.sendMessage(keepalive);
261                         nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(this.keepAlive);
262                 }
263                 this.stateTimer.newTimeout(new TimerTask() {
264                         @Override
265                         public void run(final Timeout timeout) throws Exception {
266                                 handleKeepaliveTimer();
267                         }
268                 }, nextKeepalive - ct, TimeUnit.NANOSECONDS);
269         }
270
271         @Override
272         final public String toString() {
273                 return addToStringAttributes(Objects.toStringHelper(this)).toString();
274         }
275
276         protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
277                 toStringHelper.add("channel", this.channel);
278                 toStringHelper.add("state", this.state);
279                 return toStringHelper;
280         }
281
282         @Override
283         public Set<BGPTableType> getAdvertisedTableTypes() {
284                 return this.tableTypes;
285         }
286
287         @Override
288         protected synchronized void sessionUp() {
289                 this.state = State.Up;
290                 this.listener.onSessionUp(this);
291         }
292
293         public synchronized State getState() {
294                 return this.state;
295         }
296 }