Rework PCEP timers to work on channel
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / PCEPSessionImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.pcep.impl;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Objects;
12 import com.google.common.base.Objects.ToStringHelper;
13 import com.google.common.base.Preconditions;
14
15 import io.netty.channel.Channel;
16 import io.netty.channel.ChannelFuture;
17 import io.netty.channel.ChannelFutureListener;
18 import io.netty.util.concurrent.Future;
19
20 import java.io.IOException;
21 import java.net.InetAddress;
22 import java.net.InetSocketAddress;
23 import java.util.Date;
24 import java.util.LinkedList;
25 import java.util.Queue;
26 import java.util.concurrent.TimeUnit;
27
28 import org.opendaylight.protocol.framework.AbstractProtocolSession;
29 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
30 import org.opendaylight.protocol.pcep.PCEPSession;
31 import org.opendaylight.protocol.pcep.PCEPSessionListener;
32 import org.opendaylight.protocol.pcep.TerminationReason;
33 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.CloseBuilder;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Keepalive;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.KeepaliveBuilder;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.CloseMessage;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.KeepaliveMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.Open;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Implementation of PCEPSession. (Not final for testing.)
51  */
52 @VisibleForTesting
53 public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements PCEPSession, PCEPSessionRuntimeMXBean {
54     /**
55      * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
56      */
57     @VisibleForTesting
58     protected volatile long lastMessageSentAt;
59
60     /**
61      * System.nanoTime value about when was received the last message
62      */
63     private long lastMessageReceivedAt;
64
65     @VisibleForTesting
66     protected final Queue<Long> unknownMessagesTimes = new LinkedList<Long>();
67
68     private final PCEPSessionListener listener;
69
70     /**
71      * Open Object with session characteristics that were accepted by another PCE (sent from this session).
72      */
73     private final Open localOpen;
74
75     /**
76      * Open Object with session characteristics for this session (sent from another PCE).
77      */
78     private final Open remoteOpen;
79
80     private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class);
81
82     private int sentMsgCount = 0;
83
84     private int receivedMsgCount = 0;
85
86     private int maxUnknownMessages;
87
88     // True if the listener should not be notified about events
89     private boolean closed = false;
90
91     private final Channel channel;
92
93     private final Keepalive kaMessage = new KeepaliveBuilder().setKeepaliveMessage(new KeepaliveMessageBuilder().build()).build();
94
95     PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
96             final Open localOpen, final Open remoteOpen) {
97         this.listener = Preconditions.checkNotNull(listener);
98         this.channel = Preconditions.checkNotNull(channel);
99         this.localOpen = Preconditions.checkNotNull(localOpen);
100         this.remoteOpen = Preconditions.checkNotNull(remoteOpen);
101         this.lastMessageReceivedAt = System.nanoTime();
102
103         if (maxUnknownMessages != 0) {
104             this.maxUnknownMessages = maxUnknownMessages;
105         }
106
107
108         if (getDeadTimerValue() != 0) {
109             channel.eventLoop().schedule(new Runnable() {
110                 @Override
111                 public void run() {
112                     handleDeadTimer();
113                 }
114             }, getDeadTimerValue(), TimeUnit.SECONDS);
115         }
116
117         if (getKeepAliveTimerValue() != 0) {
118             channel.eventLoop().schedule(new Runnable() {
119                 @Override
120                 public void run() {
121                     handleKeepaliveTimer();
122                 }
123             }, getKeepAliveTimerValue(), TimeUnit.SECONDS);
124         }
125
126         LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(),
127                 remoteOpen.getSessionId());
128     }
129
130     /**
131      * If DeadTimer expires, the session ends. If a message (whichever) was received during this period, the DeadTimer
132      * will be rescheduled by DEAD_TIMER_VALUE + the time that has passed from the start of the DeadTimer to the time at
133      * which the message was received. If the session was closed by the time this method starts to execute (the session
134      * state will become IDLE), that rescheduling won't occur.
135      */
136     private synchronized void handleDeadTimer() {
137         final long ct = System.nanoTime();
138
139         final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
140
141         if (this.channel.isActive()) {
142             if (ct >= nextDead) {
143                 LOG.debug("DeadTimer expired. {}", new Date());
144                 this.terminate(TerminationReason.ExpDeadtimer);
145             } else {
146                 this.channel.eventLoop().schedule(new Runnable() {
147                     @Override
148                     public void run() {
149                         handleDeadTimer();
150                     }
151                 }, nextDead - ct, TimeUnit.NANOSECONDS);
152             }
153         }
154     }
155
156     /**
157      * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
158      * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
159      * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
160      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
161      */
162     private  void handleKeepaliveTimer() {
163         final long ct = System.nanoTime();
164
165         long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
166
167         if (this.channel.isActive()) {
168             if (ct >= nextKeepalive) {
169                 this.sendMessage(this.kaMessage);
170                 nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
171             }
172
173             this.channel.eventLoop().schedule(new Runnable() {
174                 @Override
175                 public void run() {
176                     handleKeepaliveTimer();
177                 }
178             }, nextKeepalive - ct, TimeUnit.NANOSECONDS);
179         }
180     }
181
182     /**
183      * Sends message to serialization.
184      *
185      * @param msg to be sent
186      */
187     @Override
188     public Future<Void> sendMessage(final Message msg) {
189         final ChannelFuture f = this.channel.writeAndFlush(msg);
190         this.lastMessageSentAt = System.nanoTime();
191         if (!(msg instanceof KeepaliveMessage)) {
192             LOG.debug("PCEP Message enqueued: {}", msg);
193         }
194         this.sentMsgCount++;
195
196         f.addListener(new ChannelFutureListener() {
197             @Override
198             public void operationComplete(final ChannelFuture arg) {
199                 if (arg.isSuccess()) {
200                     LOG.trace("Message sent to socket: {}", msg);
201                 } else {
202                     LOG.debug("Message not sent: {}", msg, arg.cause());
203                 }
204             }
205         });
206
207         return f;
208     }
209
210     /**
211      * Closes PCEP session without sending a Close message, as the channel is no longer active.
212      */
213     @Override
214     public void close() {
215         LOG.info("Closing PCEP session: {}", this);
216         this.channel.close();
217     }
218
219     /**
220      * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
221      * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
222      * inside the session or from the listener, therefore the parent of this session should be informed.
223      */
224     @Override
225     public synchronized void close(final TerminationReason reason) {
226         LOG.info("Closing PCEP session: {}", this);
227         this.closed = true;
228         this.sendMessage(new CloseBuilder().setCCloseMessage(
229                 new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
230         this.channel.close();
231     }
232
233     @Override
234     public Tlvs getRemoteTlvs() {
235         return this.remoteOpen.getTlvs();
236     }
237
238     @Override
239     public InetAddress getRemoteAddress() {
240         return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
241     }
242
243     private synchronized void terminate(final TerminationReason reason) {
244         LOG.info("Local PCEP session termination : {}", reason);
245         this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
246         this.closed = true;
247         this.sendMessage(new CloseBuilder().setCCloseMessage(
248                 new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
249         this.close();
250     }
251
252     @Override
253     public synchronized void endOfInput() {
254         if (!this.closed) {
255             this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
256             this.closed = true;
257         }
258     }
259
260     private void sendErrorMessage(final PCEPErrors value) {
261         this.sendErrorMessage(value, null);
262     }
263
264     /**
265      * Sends PCEP Error Message with one PCEPError and Open Object.
266      *
267      * @param value
268      * @param open
269      */
270     private void sendErrorMessage(final PCEPErrors value, final Open open) {
271         this.sendMessage(Util.createErrorMessage(value, open));
272     }
273
274     /**
275      * The fact, that a message is malformed, comes from parser. In case of unrecognized message a particular error is
276      * sent (CAPABILITY_NOT_SUPPORTED) and the method checks if the MAX_UNKNOWN_MSG per minute wasn't overstepped.
277      * Second, any other error occurred that is specified by rfc. In this case, the an error message is generated and
278      * sent.
279      *
280      * @param error documented error in RFC5440 or draft
281      */
282     @VisibleForTesting
283     public void handleMalformedMessage(final PCEPErrors error) {
284         final long ct = System.nanoTime();
285         this.sendErrorMessage(error);
286         if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
287             this.unknownMessagesTimes.add(ct);
288             while (ct - this.unknownMessagesTimes.peek() > 60 * 1E9) {
289                 this.unknownMessagesTimes.poll();
290             }
291             if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
292                 this.terminate(TerminationReason.TooManyUnknownMsg);
293             }
294         }
295     }
296
297     /**
298      * Handles incoming message. If the session is up, it notifies the user. The user is notified about every message
299      * except KeepAlive.
300      *
301      * @param msg incoming message
302      */
303     @Override
304     public void handleMessage(final Message msg) {
305         // Update last reception time
306         this.lastMessageReceivedAt = System.nanoTime();
307         this.receivedMsgCount++;
308         if (!(msg instanceof KeepaliveMessage)) {
309             LOG.debug("PCEP message {} received.", msg);
310         }
311         // Internal message handling. The user does not see these messages
312         if (msg instanceof KeepaliveMessage) {
313             // Do nothing, the timer has been already reset
314         } else if (msg instanceof OpenMessage) {
315             this.sendErrorMessage(PCEPErrors.ATTEMPT_2ND_SESSION);
316         } else if (msg instanceof CloseMessage) {
317             /*
318              * Session is up, we are reporting all messages to user. One notable
319              * exception is CLOSE message, which needs to be converted into a
320              * session DOWN event.
321              */
322             this.close();
323         } else {
324             // This message needs to be handled by the user
325             this.listener.onMessage(this, msg);
326         }
327     }
328
329     /**
330      * @return the sentMsgCount
331      */
332
333     @Override
334     public final Integer getSentMsgCount() {
335         return this.sentMsgCount;
336     }
337
338     /**
339      * @return the receivedMsgCount
340      */
341
342     @Override
343     public final Integer getReceivedMsgCount() {
344         return this.receivedMsgCount;
345     }
346
347     @Override
348     public final Integer getDeadTimerValue() {
349         return Integer.valueOf(this.remoteOpen.getDeadTimer());
350     }
351
352     @Override
353     public final Integer getKeepAliveTimerValue() {
354         return Integer.valueOf(this.localOpen.getKeepalive());
355     }
356
357     @Override
358     public final String getPeerAddress() {
359         final InetSocketAddress a = (InetSocketAddress) this.channel.remoteAddress();
360         return a.getHostName();
361     }
362
363     @Override
364     public void tearDown() {
365         this.close();
366     }
367
368     @Override
369     public final String toString() {
370         return addToStringAttributes(Objects.toStringHelper(this)).toString();
371     }
372
373     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
374         toStringHelper.add("channel", this.channel);
375         toStringHelper.add("localOpen", this.localOpen);
376         toStringHelper.add("remoteOpen", this.remoteOpen);
377         return toStringHelper;
378     }
379
380     @Override
381     @VisibleForTesting
382     public void sessionUp() {
383         this.listener.onSessionUp(this);
384     }
385
386     @Override
387     public String getNodeIdentifier() {
388         return "";
389     }
390 }