Bug 3106 - Untangle directories
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / PCEPSessionImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.pcep.impl;
9
10 import com.google.common.annotations.VisibleForTesting;
11 import com.google.common.base.MoreObjects;
12 import com.google.common.base.MoreObjects.ToStringHelper;
13 import com.google.common.base.Preconditions;
14 import io.netty.channel.Channel;
15 import io.netty.channel.ChannelFuture;
16 import io.netty.channel.ChannelFutureListener;
17 import io.netty.util.concurrent.Future;
18 import java.io.IOException;
19 import java.net.InetAddress;
20 import java.net.InetSocketAddress;
21 import java.util.Date;
22 import java.util.LinkedList;
23 import java.util.Queue;
24 import java.util.concurrent.TimeUnit;
25 import org.opendaylight.protocol.framework.AbstractProtocolSession;
26 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
27 import org.opendaylight.protocol.pcep.PCEPSession;
28 import org.opendaylight.protocol.pcep.PCEPSessionListener;
29 import org.opendaylight.protocol.pcep.TerminationReason;
30 import org.opendaylight.protocol.pcep.impl.spi.Util;
31 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.LocalPref;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.Messages;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.pcep.stats.rev141006.pcep.session.state.PeerPref;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.CloseBuilder;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Keepalive;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.KeepaliveBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.CloseMessage;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.KeepaliveMessage;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.PcerrMessage;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.Open;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
48 import org.opendaylight.yangtools.yang.binding.DataContainer;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 /**
53  * Implementation of PCEPSession. (Not final for testing.)
54  */
55 @VisibleForTesting
56 public class PCEPSessionImpl extends AbstractProtocolSession<Message> implements PCEPSession {
57     /**
58      * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
59      */
60     @VisibleForTesting
61     protected volatile long lastMessageSentAt;
62
63     /**
64      * System.nanoTime value about when was received the last message
65      */
66     private long lastMessageReceivedAt;
67
68     private final Queue<Long> unknownMessagesTimes = new LinkedList<Long>();
69
70     private final PCEPSessionListener listener;
71
72     /**
73      * Open Object with session characteristics that were accepted by another PCE (sent from this session).
74      */
75     private final Open localOpen;
76
77     /**
78      * Open Object with session characteristics for this session (sent from another PCE).
79      */
80     private final Open remoteOpen;
81
82     private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class);
83
84     private int maxUnknownMessages;
85
86     // True if the listener should not be notified about events
87     private boolean closed = false;
88
89     private final Channel channel;
90
91     private final Keepalive kaMessage = new KeepaliveBuilder().setKeepaliveMessage(new KeepaliveMessageBuilder().build()).build();
92
93     private final PCEPSessionState sessionState;
94
95     PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
96         final Open localOpen, final Open remoteOpen) {
97         this.listener = Preconditions.checkNotNull(listener);
98         this.channel = Preconditions.checkNotNull(channel);
99         this.localOpen = Preconditions.checkNotNull(localOpen);
100         this.remoteOpen = Preconditions.checkNotNull(remoteOpen);
101         this.lastMessageReceivedAt = System.nanoTime();
102
103         if (maxUnknownMessages != 0) {
104             this.maxUnknownMessages = maxUnknownMessages;
105         }
106
107
108         if (getDeadTimerValue() != 0) {
109             channel.eventLoop().schedule(new Runnable() {
110                 @Override
111                 public void run() {
112                     handleDeadTimer();
113                 }
114             }, getDeadTimerValue(), TimeUnit.SECONDS);
115         }
116
117         if (getKeepAliveTimerValue() != 0) {
118             channel.eventLoop().schedule(new Runnable() {
119                 @Override
120                 public void run() {
121                     handleKeepaliveTimer();
122                 }
123             }, getKeepAliveTimerValue(), TimeUnit.SECONDS);
124         }
125
126         LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(),
127             remoteOpen.getSessionId());
128         this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
129     }
130
131     public final Integer getKeepAliveTimerValue() {
132         return this.localOpen.getKeepalive().intValue();
133     }
134
135     public final Integer getDeadTimerValue() {
136         return this.remoteOpen.getDeadTimer().intValue();
137     }
138
139     /**
140      * If DeadTimer expires, the session ends. If a message (whichever) was received during this period, the DeadTimer
141      * will be rescheduled by DEAD_TIMER_VALUE + the time that has passed from the start of the DeadTimer to the time at
142      * which the message was received. If the session was closed by the time this method starts to execute (the session
143      * state will become IDLE), that rescheduling won't occur.
144      */
145     private synchronized void handleDeadTimer() {
146         final long ct = System.nanoTime();
147
148         final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
149
150         if (this.channel.isActive()) {
151             if (ct >= nextDead) {
152                 LOG.debug("DeadTimer expired. {}", new Date());
153                 this.terminate(TerminationReason.EXP_DEADTIMER);
154             } else {
155                 this.channel.eventLoop().schedule(new Runnable() {
156                     @Override
157                     public void run() {
158                         handleDeadTimer();
159                     }
160                 }, nextDead - ct, TimeUnit.NANOSECONDS);
161             }
162         }
163     }
164
165     /**
166      * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
167      * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
168      * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
169      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
170      */
171     private  void handleKeepaliveTimer() {
172         final long ct = System.nanoTime();
173
174         long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
175
176         if (this.channel.isActive()) {
177             if (ct >= nextKeepalive) {
178                 this.sendMessage(this.kaMessage);
179                 nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
180             }
181
182             this.channel.eventLoop().schedule(new Runnable() {
183                 @Override
184                 public void run() {
185                     handleKeepaliveTimer();
186                 }
187             }, nextKeepalive - ct, TimeUnit.NANOSECONDS);
188         }
189     }
190
191     /**
192      * Sends message to serialization.
193      *
194      * @param msg to be sent
195      */
196     @Override
197     public Future<Void> sendMessage(final Message msg) {
198         final ChannelFuture f = this.channel.writeAndFlush(msg);
199         this.lastMessageSentAt = System.nanoTime();
200         this.sessionState.updateLastSentMsg();
201         if (!(msg instanceof KeepaliveMessage)) {
202             LOG.debug("PCEP Message enqueued: {}", msg);
203         }
204         if (msg instanceof PcerrMessage) {
205             this.sessionState.setLastSentError(msg);
206         }
207
208         f.addListener(new ChannelFutureListener() {
209             @Override
210             public void operationComplete(final ChannelFuture arg) {
211                 if (arg.isSuccess()) {
212                     LOG.trace("Message sent to socket: {}", msg);
213                 } else {
214                     LOG.debug("Message not sent: {}", msg, arg.cause());
215                 }
216             }
217         });
218
219         return f;
220     }
221
222     /**
223      * Closes PCEP session without sending a Close message, as the channel is no longer active.
224      */
225     @Override
226     public void close() {
227         LOG.info("Closing PCEP session: {}", this);
228         this.channel.close();
229     }
230
231     /**
232      * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
233      * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
234      * inside the session or from the listener, therefore the parent of this session should be informed.
235      */
236     @Override
237     public synchronized void close(final TerminationReason reason) {
238         LOG.info("Closing PCEP session: {}", this);
239         this.closed = true;
240         this.sendMessage(new CloseBuilder().setCCloseMessage(
241             new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
242         this.close();
243     }
244
245     @Override
246     public Tlvs getRemoteTlvs() {
247         return this.remoteOpen.getTlvs();
248     }
249
250     @Override
251     public InetAddress getRemoteAddress() {
252         return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
253     }
254
255     private synchronized void terminate(final TerminationReason reason) {
256         LOG.info("Local PCEP session termination : {}", reason);
257         this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
258         this.closed = true;
259         this.sendMessage(new CloseBuilder().setCCloseMessage(
260             new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
261         this.close();
262     }
263
264     @Override
265     public synchronized void endOfInput() {
266         if (!this.closed) {
267             this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
268             this.closed = true;
269         }
270     }
271
272     private void sendErrorMessage(final PCEPErrors value) {
273         this.sendErrorMessage(value, null);
274     }
275
276     /**
277      * Sends PCEP Error Message with one PCEPError and Open Object.
278      *
279      * @param value
280      * @param open
281      */
282     private void sendErrorMessage(final PCEPErrors value, final Open open) {
283         this.sendMessage(Util.createErrorMessage(value, open));
284     }
285
286     /**
287      * The fact, that a message is malformed, comes from parser. In case of unrecognized message a particular error is
288      * sent (CAPABILITY_NOT_SUPPORTED) and the method checks if the MAX_UNKNOWN_MSG per minute wasn't overstepped.
289      * Second, any other error occurred that is specified by rfc. In this case, the an error message is generated and
290      * sent.
291      *
292      * @param error documented error in RFC5440 or draft
293      */
294     @VisibleForTesting
295     public void handleMalformedMessage(final PCEPErrors error) {
296         final long ct = System.nanoTime();
297         this.sendErrorMessage(error);
298         if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
299             this.unknownMessagesTimes.add(ct);
300             while (ct - this.unknownMessagesTimes.peek() > TimeUnit.MINUTES.toNanos(1)) {
301                 this.unknownMessagesTimes.poll();
302             }
303             if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
304                 this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
305             }
306         }
307     }
308
309     /**
310      * Handles incoming message. If the session is up, it notifies the user. The user is notified about every message
311      * except KeepAlive.
312      *
313      * @param msg incoming message
314      */
315     @Override
316     public synchronized void handleMessage(final Message msg) {
317         // Update last reception time
318         this.lastMessageReceivedAt = System.nanoTime();
319         this.sessionState.updateLastReceivedMsg();
320         if (!(msg instanceof KeepaliveMessage)) {
321             LOG.debug("PCEP message {} received.", msg);
322         }
323         // Internal message handling. The user does not see these messages
324         if (msg instanceof KeepaliveMessage) {
325             // Do nothing, the timer has been already reset
326         } else if (msg instanceof OpenMessage) {
327             this.sendErrorMessage(PCEPErrors.ATTEMPT_2ND_SESSION);
328         } else if (msg instanceof CloseMessage) {
329             /*
330              * Session is up, we are reporting all messages to user. One notable
331              * exception is CLOSE message, which needs to be converted into a
332              * session DOWN event.
333              */
334             this.close();
335         } else {
336             // This message needs to be handled by the user
337             if (msg instanceof PcerrMessage) {
338                 this.sessionState.setLastReceivedError(msg);
339             }
340             this.listener.onMessage(this, msg);
341         }
342     }
343
344     @Override
345     public final String toString() {
346         return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
347     }
348
349     protected ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
350         toStringHelper.add("channel", this.channel);
351         toStringHelper.add("localOpen", this.localOpen);
352         toStringHelper.add("remoteOpen", this.remoteOpen);
353         return toStringHelper;
354     }
355
356     @Override
357     @VisibleForTesting
358     public void sessionUp() {
359         this.listener.onSessionUp(this);
360     }
361
362     @VisibleForTesting
363     protected final Queue<Long> getUnknownMessagesTimes() {
364         return this.unknownMessagesTimes;
365     }
366
367     @Override
368     public Messages getMessages() {
369         return this.sessionState.getMessages(this.unknownMessagesTimes.size());
370     }
371
372     @Override
373     public LocalPref getLocalPref() {
374         return this.sessionState.getLocalPref();
375     }
376
377     @Override
378     public PeerPref getPeerPref() {
379         return this.sessionState.getPeerPref();
380     }
381
382     @Override
383     public Class<? extends DataContainer> getImplementedInterface() {
384         throw new UnsupportedOperationException();
385     }
386     @Override
387     public void resetStats() {
388         this.sessionState.reset();
389     }
390 }