Fixed PCE's remote and local address
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / PCEPSessionImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.pcep.impl;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.Objects;
12 import com.google.common.base.Objects.ToStringHelper;
13 import com.google.common.base.Preconditions;
14 import io.netty.channel.Channel;
15 import io.netty.channel.ChannelFuture;
16 import io.netty.channel.ChannelFutureListener;
17 import io.netty.util.concurrent.Future;
18 import java.io.IOException;
19 import java.net.InetAddress;
20 import java.net.InetSocketAddress;
21 import java.util.Date;
22 import java.util.LinkedList;
23 import java.util.Queue;
24 import java.util.concurrent.TimeUnit;
25 import org.opendaylight.protocol.framework.AbstractProtocolSession;
26 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
27 import org.opendaylight.protocol.pcep.PCEPSession;
28 import org.opendaylight.protocol.pcep.PCEPSessionListener;
29 import org.opendaylight.protocol.pcep.TerminationReason;
30 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.CloseBuilder;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Keepalive;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.KeepaliveBuilder;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.CloseMessage;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.KeepaliveMessage;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.Open;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Implementation of PCEPSession. (Not final for testing.)
48  */
49 @VisibleForTesting
50 public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements PCEPSession, PCEPSessionRuntimeMXBean {
51     /**
52      * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
53      */
54     @VisibleForTesting
55     protected volatile long lastMessageSentAt;
56
57     /**
58      * System.nanoTime value about when was received the last message
59      */
60     private long lastMessageReceivedAt;
61
62     private final Queue<Long> unknownMessagesTimes = new LinkedList<Long>();
63
64     private final PCEPSessionListener listener;
65
66     /**
67      * Open Object with session characteristics that were accepted by another PCE (sent from this session).
68      */
69     private final Open localOpen;
70
71     /**
72      * Open Object with session characteristics for this session (sent from another PCE).
73      */
74     private final Open remoteOpen;
75
76     private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class);
77
78     private int sentMsgCount = 0;
79
80     private int receivedMsgCount = 0;
81
82     private int maxUnknownMessages;
83
84     // True if the listener should not be notified about events
85     private boolean closed = false;
86
87     private final Channel channel;
88
89     private final Keepalive kaMessage = new KeepaliveBuilder().setKeepaliveMessage(new KeepaliveMessageBuilder().build()).build();
90
91     PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
92         final Open localOpen, final Open remoteOpen) {
93         this.listener = Preconditions.checkNotNull(listener);
94         this.channel = Preconditions.checkNotNull(channel);
95         this.localOpen = Preconditions.checkNotNull(localOpen);
96         this.remoteOpen = Preconditions.checkNotNull(remoteOpen);
97         this.lastMessageReceivedAt = System.nanoTime();
98
99         if (maxUnknownMessages != 0) {
100             this.maxUnknownMessages = maxUnknownMessages;
101         }
102
103
104         if (getDeadTimerValue() != 0) {
105             channel.eventLoop().schedule(new Runnable() {
106                 @Override
107                 public void run() {
108                     handleDeadTimer();
109                 }
110             }, getDeadTimerValue(), TimeUnit.SECONDS);
111         }
112
113         if (getKeepAliveTimerValue() != 0) {
114             channel.eventLoop().schedule(new Runnable() {
115                 @Override
116                 public void run() {
117                     handleKeepaliveTimer();
118                 }
119             }, getKeepAliveTimerValue(), TimeUnit.SECONDS);
120         }
121
122         LOG.info("Session {}[{}] <-> {}[{}] started", getLocalAddress(), localOpen.getSessionId(), getRemoteAddress(),
123             remoteOpen.getSessionId());
124     }
125
126     /**
127      * If DeadTimer expires, the session ends. If a message (whichever) was received during this period, the DeadTimer
128      * will be rescheduled by DEAD_TIMER_VALUE + the time that has passed from the start of the DeadTimer to the time at
129      * which the message was received. If the session was closed by the time this method starts to execute (the session
130      * state will become IDLE), that rescheduling won't occur.
131      */
132     private synchronized void handleDeadTimer() {
133         final long ct = System.nanoTime();
134
135         final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
136
137         if (this.channel.isActive()) {
138             if (ct >= nextDead) {
139                 LOG.debug("DeadTimer expired. {}", new Date());
140                 this.terminate(TerminationReason.ExpDeadtimer);
141             } else {
142                 this.channel.eventLoop().schedule(new Runnable() {
143                     @Override
144                     public void run() {
145                         handleDeadTimer();
146                     }
147                 }, nextDead - ct, TimeUnit.NANOSECONDS);
148             }
149         }
150     }
151
152     /**
153      * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
154      * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
155      * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
156      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
157      */
158     private  void handleKeepaliveTimer() {
159         final long ct = System.nanoTime();
160
161         long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
162
163         if (this.channel.isActive()) {
164             if (ct >= nextKeepalive) {
165                 this.sendMessage(this.kaMessage);
166                 nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
167             }
168
169             this.channel.eventLoop().schedule(new Runnable() {
170                 @Override
171                 public void run() {
172                     handleKeepaliveTimer();
173                 }
174             }, nextKeepalive - ct, TimeUnit.NANOSECONDS);
175         }
176     }
177
178     /**
179      * Sends message to serialization.
180      *
181      * @param msg to be sent
182      */
183     @Override
184     public Future<Void> sendMessage(final Message msg) {
185         final ChannelFuture f = this.channel.writeAndFlush(msg);
186         this.lastMessageSentAt = System.nanoTime();
187         if (!(msg instanceof KeepaliveMessage)) {
188             LOG.debug("PCEP Message enqueued: {}", msg);
189         }
190         this.sentMsgCount++;
191
192         f.addListener(new ChannelFutureListener() {
193             @Override
194             public void operationComplete(final ChannelFuture arg) {
195                 if (arg.isSuccess()) {
196                     LOG.trace("Message sent to socket: {}", msg);
197                 } else {
198                     LOG.debug("Message not sent: {}", msg, arg.cause());
199                 }
200             }
201         });
202
203         return f;
204     }
205
206     /**
207      * Closes PCEP session without sending a Close message, as the channel is no longer active.
208      */
209     @Override
210     public void close() {
211         LOG.info("Closing PCEP session: {}", this);
212         this.channel.close();
213     }
214
215     /**
216      * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
217      * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
218      * inside the session or from the listener, therefore the parent of this session should be informed.
219      */
220     @Override
221     public synchronized void close(final TerminationReason reason) {
222         LOG.info("Closing PCEP session: {}", this);
223         this.closed = true;
224         this.sendMessage(new CloseBuilder().setCCloseMessage(
225             new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
226         this.channel.close();
227     }
228
229     @Override
230     public Tlvs getRemoteTlvs() {
231         return this.remoteOpen.getTlvs();
232     }
233
234     @Override
235     public InetAddress getRemoteAddress() {
236         if (this.channel.parent() != null) {
237             return ((InetSocketAddress) this.channel.localAddress()).getAddress();
238         }
239         return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
240     }
241
242     private InetAddress getLocalAddress() {
243         if (this.channel.parent() != null) {
244             return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
245         }
246         return ((InetSocketAddress) this.channel.localAddress()).getAddress();
247     }
248
249     private synchronized void terminate(final TerminationReason reason) {
250         LOG.info("Local PCEP session termination : {}", reason);
251         this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
252         this.closed = true;
253         this.sendMessage(new CloseBuilder().setCCloseMessage(
254             new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
255         this.close();
256     }
257
258     @Override
259     public synchronized void endOfInput() {
260         if (!this.closed) {
261             this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
262             this.closed = true;
263         }
264     }
265
266     private void sendErrorMessage(final PCEPErrors value) {
267         this.sendErrorMessage(value, null);
268     }
269
270     /**
271      * Sends PCEP Error Message with one PCEPError and Open Object.
272      *
273      * @param value
274      * @param open
275      */
276     private void sendErrorMessage(final PCEPErrors value, final Open open) {
277         this.sendMessage(Util.createErrorMessage(value, open));
278     }
279
280     /**
281      * The fact, that a message is malformed, comes from parser. In case of unrecognized message a particular error is
282      * sent (CAPABILITY_NOT_SUPPORTED) and the method checks if the MAX_UNKNOWN_MSG per minute wasn't overstepped.
283      * Second, any other error occurred that is specified by rfc. In this case, the an error message is generated and
284      * sent.
285      *
286      * @param error documented error in RFC5440 or draft
287      */
288     @VisibleForTesting
289     public void handleMalformedMessage(final PCEPErrors error) {
290         final long ct = System.nanoTime();
291         this.sendErrorMessage(error);
292         if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
293             this.unknownMessagesTimes.add(ct);
294             while (ct - this.unknownMessagesTimes.peek() > 60 * 1E9) {
295                 this.unknownMessagesTimes.poll();
296             }
297             if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
298                 this.terminate(TerminationReason.TooManyUnknownMsg);
299             }
300         }
301     }
302
303     /**
304      * Handles incoming message. If the session is up, it notifies the user. The user is notified about every message
305      * except KeepAlive.
306      *
307      * @param msg incoming message
308      */
309     @Override
310     public synchronized void handleMessage(final Message msg) {
311         // Update last reception time
312         this.lastMessageReceivedAt = System.nanoTime();
313         this.receivedMsgCount++;
314         if (!(msg instanceof KeepaliveMessage)) {
315             LOG.debug("PCEP message {} received.", msg);
316         }
317         // Internal message handling. The user does not see these messages
318         if (msg instanceof KeepaliveMessage) {
319             // Do nothing, the timer has been already reset
320         } else if (msg instanceof OpenMessage) {
321             this.sendErrorMessage(PCEPErrors.ATTEMPT_2ND_SESSION);
322         } else if (msg instanceof CloseMessage) {
323             /*
324              * Session is up, we are reporting all messages to user. One notable
325              * exception is CLOSE message, which needs to be converted into a
326              * session DOWN event.
327              */
328             this.close();
329         } else {
330             // This message needs to be handled by the user
331             this.listener.onMessage(this, msg);
332         }
333     }
334
335     /**
336      * @return the sentMsgCount
337      */
338
339     @Override
340     public final Integer getSentMsgCount() {
341         return this.sentMsgCount;
342     }
343
344     /**
345      * @return the receivedMsgCount
346      */
347
348     @Override
349     public final Integer getReceivedMsgCount() {
350         return this.receivedMsgCount;
351     }
352
353     @Override
354     public final Integer getDeadTimerValue() {
355         return Integer.valueOf(this.remoteOpen.getDeadTimer());
356     }
357
358     @Override
359     public final Integer getKeepAliveTimerValue() {
360         return Integer.valueOf(this.localOpen.getKeepalive());
361     }
362
363     @Override
364     public final String getPeerAddress() {
365         final InetSocketAddress a = (InetSocketAddress) this.channel.remoteAddress();
366         return a.getHostName();
367     }
368
369     @Override
370     public void tearDown() {
371         this.close();
372     }
373
374     @Override
375     public final String toString() {
376         return addToStringAttributes(Objects.toStringHelper(this)).toString();
377     }
378
379     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
380         toStringHelper.add("channel", this.channel);
381         toStringHelper.add("localOpen", this.localOpen);
382         toStringHelper.add("remoteOpen", this.remoteOpen);
383         return toStringHelper;
384     }
385
386     @Override
387     @VisibleForTesting
388     public void sessionUp() {
389         this.listener.onSessionUp(this);
390     }
391
392     @Override
393     public String getNodeIdentifier() {
394         return "";
395     }
396
397     @VisibleForTesting
398     protected final Queue<Long> getUnknownMessagesTimes() {
399         return this.unknownMessagesTimes;
400     }
401 }