Code clean up
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / PCEPSessionImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.pcep.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.MoreObjects;
14 import com.google.common.base.MoreObjects.ToStringHelper;
15 import com.google.common.base.Ticker;
16 import io.netty.channel.Channel;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelFutureListener;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.SimpleChannelInboundHandler;
21 import io.netty.util.concurrent.Future;
22 import java.io.IOException;
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.util.Date;
26 import java.util.LinkedList;
27 import java.util.Queue;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import javax.annotation.concurrent.GuardedBy;
31 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
32 import org.opendaylight.protocol.pcep.PCEPSession;
33 import org.opendaylight.protocol.pcep.PCEPSessionListener;
34 import org.opendaylight.protocol.pcep.TerminationReason;
35 import org.opendaylight.protocol.pcep.impl.spi.Util;
36 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.CloseBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.Keepalive;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev131007.KeepaliveBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPref;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.Messages;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerPref;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.CloseMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.KeepaliveMessage;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.Message;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.OpenMessage;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.PcerrMessage;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.message.CCloseMessageBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.close.object.CCloseBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.keepalive.message.KeepaliveMessageBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.Open;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev131005.open.object.open.Tlvs;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * Implementation of PCEPSession. (Not final for testing.)
58  */
59 @VisibleForTesting
60 public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implements PCEPSession {
61     private static final long MINUTE = TimeUnit.MINUTES.toNanos(1);
62     private static Ticker TICKER = Ticker.systemTicker();
63     /**
64      * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
65      */
66     private volatile long lastMessageSentAt;
67
68     /**
69      * System.nanoTime value about when was received the last message
70      */
71     private long lastMessageReceivedAt;
72
73     private final Queue<Long> unknownMessagesTimes = new LinkedList<>();
74
75     private final PCEPSessionListener listener;
76
77     /**
78      * Open Object with session characteristics that were accepted by another PCE (sent from this session).
79      */
80     private final Open localOpen;
81
82     /**
83      * Open Object with session characteristics for this session (sent from another PCE).
84      */
85     private final Open remoteOpen;
86
87     private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class);
88
89     private int maxUnknownMessages;
90
91     // True if the listener should not be notified about events
92     @GuardedBy("this")
93     private final AtomicBoolean closed = new AtomicBoolean(false);
94
95     private final Channel channel;
96
97     private final Keepalive kaMessage = new KeepaliveBuilder().setKeepaliveMessage(new KeepaliveMessageBuilder().build()).build();
98
99     private final PCEPSessionState sessionState;
100
101     PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
102             final Open localOpen, final Open remoteOpen) {
103         this.listener = requireNonNull(listener);
104         this.channel = requireNonNull(channel);
105         this.localOpen = requireNonNull(localOpen);
106         this.remoteOpen = requireNonNull(remoteOpen);
107         this.lastMessageReceivedAt = TICKER.read();
108
109         if (maxUnknownMessages != 0) {
110             this.maxUnknownMessages = maxUnknownMessages;
111         }
112
113
114         if (getDeadTimerValue() != 0) {
115             channel.eventLoop().schedule(this::handleDeadTimer, getDeadTimerValue(), TimeUnit.SECONDS);
116         }
117
118         if (getKeepAliveTimerValue() != 0) {
119             channel.eventLoop().schedule(this::handleKeepaliveTimer, getKeepAliveTimerValue(), TimeUnit.SECONDS);
120         }
121
122         LOG.info("Session {}[{}] <-> {}[{}] started", channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(),
123                 remoteOpen.getSessionId());
124         this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
125     }
126
127     public final Integer getKeepAliveTimerValue() {
128         return this.localOpen.getKeepalive().intValue();
129     }
130
131     public final Integer getDeadTimerValue() {
132         return this.remoteOpen.getDeadTimer().intValue();
133     }
134
135     /**
136      * If DeadTimer expires, the session ends. If a message (whichever) was received during this period, the DeadTimer
137      * will be rescheduled by DEAD_TIMER_VALUE + the time that has passed from the start of the DeadTimer to the time at
138      * which the message was received. If the session was closed by the time this method starts to execute (the session
139      * state will become IDLE), that rescheduling won't occur.
140      */
141     private synchronized void handleDeadTimer() {
142         final long ct = TICKER.read();
143
144         final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
145
146         if (this.channel.isActive()) {
147             if (ct >= nextDead) {
148                 LOG.debug("DeadTimer expired. {}", new Date());
149                 this.terminate(TerminationReason.EXP_DEADTIMER);
150             } else {
151                 this.channel.eventLoop().schedule(this::handleDeadTimer, nextDead - ct, TimeUnit.NANOSECONDS);
152             }
153         }
154     }
155
156     /**
157      * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
158      * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
159      * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
160      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
161      */
162     private void handleKeepaliveTimer() {
163         final long ct = TICKER.read();
164
165         long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
166
167         if (this.channel.isActive()) {
168             if (ct >= nextKeepalive) {
169                 this.sendMessage(this.kaMessage);
170                 nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
171             }
172
173             this.channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS);
174         }
175     }
176
177     /**
178      * Handle exception occurred in the PCEP session. The session in error state should be closed
179      * properly so that it can be restored later.
180      */
181     @VisibleForTesting
182     void handleException(final Throwable cause) {
183         LOG.error("Exception captured for session {}, closing session.", this, cause);
184         terminate(TerminationReason.UNKNOWN);
185     }
186
187     /**
188      * Sends message to serialization.
189      *
190      * @param msg to be sent
191      */
192     @Override
193     public Future<Void> sendMessage(final Message msg) {
194         final ChannelFuture f = this.channel.writeAndFlush(msg);
195         this.lastMessageSentAt = TICKER.read();
196         this.sessionState.updateLastSentMsg();
197         if (!(msg instanceof KeepaliveMessage)) {
198             LOG.debug("PCEP Message enqueued: {}", msg);
199         }
200         if (msg instanceof PcerrMessage) {
201             this.sessionState.setLastSentError(msg);
202         }
203
204         f.addListener((ChannelFutureListener) arg -> {
205             if (arg.isSuccess()) {
206                 LOG.trace("Message sent to socket: {}", msg);
207             } else {
208                 LOG.debug("Message not sent: {}", msg, arg.cause());
209             }
210         });
211
212         return f;
213     }
214
215     @VisibleForTesting
216     ChannelFuture closeChannel() {
217         LOG.info("Closing PCEP session channel: {}", this.channel);
218         return this.channel.close();
219     }
220
221     @VisibleForTesting
222     public synchronized boolean isClosed() {
223         return this.closed.get();
224     }
225
226     /**
227      * Closes PCEP session without sending a Close message, as the channel is no longer active.
228      */
229     @Override
230     public synchronized void close() {
231         close(null);
232     }
233
234     /**
235      * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
236      * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
237      * inside the session or from the listener, therefore the parent of this session should be informed.
238      */
239     @Override
240     public synchronized void close(final TerminationReason reason) {
241         if (this.closed.getAndSet(true)) {
242             LOG.debug("Session is already closed.");
243             return;
244         }
245         // only send close message when the reason is provided
246         if (reason != null) {
247             LOG.info("Closing PCEP session with reason {}: {}", reason, this);
248             sendMessage(new CloseBuilder().setCCloseMessage(
249                     new CCloseMessageBuilder().setCClose(new CCloseBuilder().setReason(reason.getShortValue()).build()).build()).build());
250         } else {
251             LOG.info("Closing PCEP session: {}", this);
252         }
253         closeChannel();
254     }
255
256     @Override
257     public Tlvs getRemoteTlvs() {
258         return this.remoteOpen.getTlvs();
259     }
260
261     @Override
262     public InetAddress getRemoteAddress() {
263         return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
264     }
265
266     private synchronized void terminate(final TerminationReason reason) {
267         if (this.closed.get()) {
268             LOG.debug("Session {} is already closed.", this);
269             return;
270         }
271         close(reason);
272         this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
273     }
274
275     synchronized void endOfInput() {
276         if (!this.closed.getAndSet(true)) {
277             this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
278         }
279     }
280
281     private void sendErrorMessage(final PCEPErrors value) {
282         this.sendErrorMessage(value, null);
283     }
284
285     /**
286      * Sends PCEP Error Message with one PCEPError and Open Object.
287      *
288      * @param value
289      * @param open
290      */
291     private void sendErrorMessage(final PCEPErrors value, final Open open) {
292         this.sendMessage(Util.createErrorMessage(value, open));
293     }
294
295     /**
296      * The fact, that a message is malformed, comes from parser. In case of unrecognized message a particular error is
297      * sent (CAPABILITY_NOT_SUPPORTED) and the method checks if the MAX_UNKNOWN_MSG per minute wasn't overstepped.
298      * Second, any other error occurred that is specified by rfc. In this case, the an error message is generated and
299      * sent.
300      *
301      * @param error documented error in RFC5440 or draft
302      */
303     @VisibleForTesting
304     void handleMalformedMessage(final PCEPErrors error) {
305         final long ct = TICKER.read();
306         this.sendErrorMessage(error);
307         if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
308             this.unknownMessagesTimes.add(ct);
309             while (ct - this.unknownMessagesTimes.peek() > MINUTE) {
310                 final Long poll = this.unknownMessagesTimes.poll();
311             }
312             if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
313                 this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
314             }
315         }
316     }
317
318     /**
319      * Handles incoming message. If the session is up, it notifies the user. The user is notified about every message
320      * except KeepAlive.
321      *
322      * @param msg incoming message
323      */
324     public synchronized void handleMessage(final Message msg) {
325         if (this.closed.get()) {
326             LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg);
327             return;
328         }
329         // Update last reception time
330         this.lastMessageReceivedAt = TICKER.read();
331         this.sessionState.updateLastReceivedMsg();
332         if (!(msg instanceof KeepaliveMessage)) {
333             LOG.debug("PCEP message {} received.", msg);
334         }
335         // Internal message handling. The user does not see these messages
336         if (msg instanceof KeepaliveMessage) {
337             // Do nothing, the timer has been already reset
338         } else if (msg instanceof OpenMessage) {
339             this.sendErrorMessage(PCEPErrors.ATTEMPT_2ND_SESSION);
340         } else if (msg instanceof CloseMessage) {
341             /*
342              * Session is up, we are reporting all messages to user. One notable
343              * exception is CLOSE message, which needs to be converted into a
344              * session DOWN event.
345              */
346             close();
347             this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason
348                     .forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason())));
349         } else {
350             // This message needs to be handled by the user
351             if (msg instanceof PcerrMessage) {
352                 this.sessionState.setLastReceivedError(msg);
353             }
354             this.listener.onMessage(this, msg);
355         }
356     }
357
358     @Override
359     public final String toString() {
360         return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
361     }
362
363     private ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
364         toStringHelper.add("channel", this.channel);
365         toStringHelper.add("localOpen", this.localOpen);
366         toStringHelper.add("remoteOpen", this.remoteOpen);
367         return toStringHelper;
368     }
369
370     @VisibleForTesting
371     void sessionUp() {
372         try {
373             this.listener.onSessionUp(this);
374         } catch (final Exception e) {
375             handleException(e);
376             throw e;
377         }
378     }
379
380     @VisibleForTesting
381     final Queue<Long> getUnknownMessagesTimes() {
382         return this.unknownMessagesTimes;
383     }
384
385     @Override
386     public Messages getMessages() {
387         return this.sessionState.getMessages(this.unknownMessagesTimes.size());
388     }
389
390     @Override
391     public LocalPref getLocalPref() {
392         return this.sessionState.getLocalPref();
393     }
394
395     @Override
396     public PeerPref getPeerPref() {
397         return this.sessionState.getPeerPref();
398     }
399
400     @Override
401     public Open getLocalOpen() {
402         return this.sessionState.getLocalOpen();
403     }
404
405     @Override
406     public synchronized final void channelInactive(final ChannelHandlerContext ctx) {
407         LOG.debug("Channel {} inactive.", ctx.channel());
408         endOfInput();
409
410         try {
411             super.channelInactive(ctx);
412         } catch (final Exception e) {
413             throw new IllegalStateException("Failed to delegate channel inactive event on channel " + ctx.channel(), e);
414         }
415     }
416
417     @Override
418     protected synchronized final void channelRead0(final ChannelHandlerContext ctx, final Message msg) {
419         LOG.debug("Message was received: {}", msg);
420         handleMessage(msg);
421     }
422
423     @Override
424     public synchronized final void handlerAdded(final ChannelHandlerContext ctx) {
425         this.sessionUp();
426     }
427
428     @Override
429     public  synchronized void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
430         handleException(cause);
431     }
432
433     @Override
434     public Tlvs getLocalTlvs() {
435         return this.localOpen.getTlvs();
436     }
437
438     @VisibleForTesting
439     static void setTicker(final Ticker ticker) {
440         TICKER = ticker;
441     }
442 }