clean pcep/impl
[bgpcep.git] / pcep / impl / src / main / java / org / opendaylight / protocol / pcep / impl / PCEPSessionImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.protocol.pcep.impl;
9
10 import static java.util.Objects.requireNonNull;
11
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.MoreObjects;
14 import com.google.common.base.MoreObjects.ToStringHelper;
15 import com.google.common.base.Ticker;
16 import io.netty.channel.Channel;
17 import io.netty.channel.ChannelFuture;
18 import io.netty.channel.ChannelFutureListener;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.channel.SimpleChannelInboundHandler;
21 import io.netty.util.concurrent.Future;
22 import java.io.IOException;
23 import java.net.InetAddress;
24 import java.net.InetSocketAddress;
25 import java.util.Date;
26 import java.util.LinkedList;
27 import java.util.Queue;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import org.checkerframework.checker.lock.qual.GuardedBy;
31 import org.opendaylight.protocol.pcep.PCEPCloseTermination;
32 import org.opendaylight.protocol.pcep.PCEPSession;
33 import org.opendaylight.protocol.pcep.PCEPSessionListener;
34 import org.opendaylight.protocol.pcep.TerminationReason;
35 import org.opendaylight.protocol.pcep.impl.spi.Util;
36 import org.opendaylight.protocol.pcep.spi.PCEPErrors;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.CloseBuilder;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.Keepalive;
39 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.message.rev181109.KeepaliveBuilder;
40 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.LocalPref;
41 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.Messages;
42 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.stats.rev171113.pcep.session.state.PeerPref;
43 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.CloseMessage;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.KeepaliveMessage;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.Message;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.OpenMessage;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.PcerrMessage;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.message.CCloseMessageBuilder;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.close.object.CCloseBuilder;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.keepalive.message.KeepaliveMessageBuilder;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.Open;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.pcep.types.rev181109.open.object.open.Tlvs;
53 import org.opendaylight.yangtools.yang.common.Uint8;
54 import org.slf4j.Logger;
55 import org.slf4j.LoggerFactory;
56
57 /**
58  * Implementation of PCEPSession. (Not final for testing.)
59  */
60 @VisibleForTesting
61 public class PCEPSessionImpl extends SimpleChannelInboundHandler<Message> implements PCEPSession {
62     private static final long MINUTE = TimeUnit.MINUTES.toNanos(1);
63     private static Ticker TICKER = Ticker.systemTicker();
64     /**
65      * System.nanoTime value about when was sent the last message Protected to be updated also in tests.
66      */
67     private volatile long lastMessageSentAt;
68
69     /**
70      * System.nanoTime value about when was received the last message.
71      */
72     private long lastMessageReceivedAt;
73
74     private final Queue<Long> unknownMessagesTimes = new LinkedList<>();
75
76     private final PCEPSessionListener listener;
77
78     /**
79      * Open Object with session characteristics that were accepted by another PCE (sent from this session).
80      */
81     private final Open localOpen;
82
83     /**
84      * Open Object with session characteristics for this session (sent from another PCE).
85      */
86     private final Open remoteOpen;
87
88     private static final Logger LOG = LoggerFactory.getLogger(PCEPSessionImpl.class);
89
90     private int maxUnknownMessages;
91
92     // True if the listener should not be notified about events
93     @GuardedBy("this")
94     private final AtomicBoolean closed = new AtomicBoolean(false);
95
96     private final Channel channel;
97
98     private final Keepalive kaMessage =
99         new KeepaliveBuilder().setKeepaliveMessage(new KeepaliveMessageBuilder().build()).build();
100
101     private final PCEPSessionState sessionState;
102
103     PCEPSessionImpl(final PCEPSessionListener listener, final int maxUnknownMessages, final Channel channel,
104             final Open localOpen, final Open remoteOpen) {
105         this.listener = requireNonNull(listener);
106         this.channel = requireNonNull(channel);
107         this.localOpen = requireNonNull(localOpen);
108         this.remoteOpen = requireNonNull(remoteOpen);
109         this.lastMessageReceivedAt = TICKER.read();
110
111         if (maxUnknownMessages != 0) {
112             this.maxUnknownMessages = maxUnknownMessages;
113         }
114
115
116         if (getDeadTimerValue() != 0) {
117             channel.eventLoop().schedule(this::handleDeadTimer, getDeadTimerValue(), TimeUnit.SECONDS);
118         }
119
120         if (getKeepAliveTimerValue() != 0) {
121             channel.eventLoop().schedule(this::handleKeepaliveTimer, getKeepAliveTimerValue(), TimeUnit.SECONDS);
122         }
123
124         LOG.info("Session {}[{}] <-> {}[{}] started",
125             channel.localAddress(), localOpen.getSessionId(), channel.remoteAddress(), remoteOpen.getSessionId());
126         this.sessionState = new PCEPSessionState(remoteOpen, localOpen, channel);
127     }
128
129     public final Integer getKeepAliveTimerValue() {
130         return this.localOpen.getKeepalive().intValue();
131     }
132
133     public final Integer getDeadTimerValue() {
134         return this.remoteOpen.getDeadTimer().intValue();
135     }
136
137     /**
138      * If DeadTimer expires, the session ends. If a message (whichever) was received during this period, the DeadTimer
139      * will be rescheduled by DEAD_TIMER_VALUE + the time that has passed from the start of the DeadTimer to the time at
140      * which the message was received. If the session was closed by the time this method starts to execute (the session
141      * state will become IDLE), that rescheduling won't occur.
142      */
143     private synchronized void handleDeadTimer() {
144         final long ct = TICKER.read();
145
146         final long nextDead = this.lastMessageReceivedAt + TimeUnit.SECONDS.toNanos(getDeadTimerValue());
147
148         if (this.channel.isActive()) {
149             if (ct >= nextDead) {
150                 LOG.debug("DeadTimer expired. {}", new Date());
151                 this.terminate(TerminationReason.EXP_DEADTIMER);
152             } else {
153                 this.channel.eventLoop().schedule(this::handleDeadTimer, nextDead - ct, TimeUnit.NANOSECONDS);
154             }
155         }
156     }
157
158     /**
159      * If KeepAlive Timer expires, sends KeepAlive message. If a message (whichever) was send during this period, the
160      * KeepAlive Timer will be rescheduled by KEEP_ALIVE_TIMER_VALUE + the time that has passed from the start of the
161      * KeepAlive timer to the time at which the message was sent. If the session was closed by the time this method
162      * starts to execute (the session state will become IDLE), that rescheduling won't occur.
163      */
164     private void handleKeepaliveTimer() {
165         final long ct = TICKER.read();
166
167         long nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
168
169         if (this.channel.isActive()) {
170             if (ct >= nextKeepalive) {
171                 this.sendMessage(this.kaMessage);
172                 nextKeepalive = this.lastMessageSentAt + TimeUnit.SECONDS.toNanos(getKeepAliveTimerValue());
173             }
174
175             this.channel.eventLoop().schedule(this::handleKeepaliveTimer, nextKeepalive - ct, TimeUnit.NANOSECONDS);
176         }
177     }
178
179     /**
180      * Handle exception occurred in the PCEP session. The session in error state should be closed
181      * properly so that it can be restored later.
182      */
183     @VisibleForTesting
184     void handleException(final Throwable cause) {
185         LOG.error("Exception captured for session {}, closing session.", this, cause);
186         terminate(TerminationReason.UNKNOWN);
187     }
188
189     /**
190      * Sends message to serialization.
191      *
192      * @param msg to be sent
193      */
194     @Override
195     public Future<Void> sendMessage(final Message msg) {
196         final ChannelFuture f = this.channel.writeAndFlush(msg);
197         this.lastMessageSentAt = TICKER.read();
198         this.sessionState.updateLastSentMsg();
199         if (!(msg instanceof KeepaliveMessage)) {
200             LOG.debug("PCEP Message enqueued: {}", msg);
201         }
202         if (msg instanceof PcerrMessage) {
203             this.sessionState.setLastSentError(msg);
204         }
205
206         f.addListener((ChannelFutureListener) arg -> {
207             if (arg.isSuccess()) {
208                 LOG.trace("Message sent to socket: {}", msg);
209             } else {
210                 LOG.debug("Message not sent: {}", msg, arg.cause());
211             }
212         });
213
214         return f;
215     }
216
217     @VisibleForTesting
218     ChannelFuture closeChannel() {
219         LOG.info("Closing PCEP session channel: {}", this.channel);
220         return this.channel.close();
221     }
222
223     @VisibleForTesting
224     public synchronized boolean isClosed() {
225         return this.closed.get();
226     }
227
228     /**
229      * Closes PCEP session without sending a Close message, as the channel is no longer active.
230      */
231     @Override
232     public synchronized void close() {
233         close(null);
234     }
235
236     /**
237      * Closes PCEP session, cancels all timers, returns to state Idle, sends the Close Message. KeepAlive and DeadTimer
238      * are cancelled if the state of the session changes to IDLE. This method is used to close the PCEP session from
239      * inside the session or from the listener, therefore the parent of this session should be informed.
240      */
241     @Override
242     public void close(final TerminationReason reason) {
243         if (this.closed.getAndSet(true)) {
244             LOG.debug("Session is already closed.");
245             return;
246         }
247         // only send close message when the reason is provided
248         if (reason != null) {
249             LOG.info("Closing PCEP session with reason {}: {}", reason, this);
250             sendMessage(new CloseBuilder().setCCloseMessage(
251                     new CCloseMessageBuilder().setCClose(new CCloseBuilder()
252                         .setReason(Uint8.valueOf(reason.getShortValue())).build()).build()).build());
253         } else {
254             LOG.info("Closing PCEP session: {}", this);
255         }
256         closeChannel();
257     }
258
259     @Override
260     public Tlvs getRemoteTlvs() {
261         return this.remoteOpen.getTlvs();
262     }
263
264     @Override
265     public InetAddress getRemoteAddress() {
266         return ((InetSocketAddress) this.channel.remoteAddress()).getAddress();
267     }
268
269     private synchronized void terminate(final TerminationReason reason) {
270         if (this.closed.get()) {
271             LOG.debug("Session {} is already closed.", this);
272             return;
273         }
274         close(reason);
275         this.listener.onSessionTerminated(this, new PCEPCloseTermination(reason));
276     }
277
278     synchronized void endOfInput() {
279         if (!this.closed.getAndSet(true)) {
280             this.listener.onSessionDown(this, new IOException("End of input detected. Close the session."));
281         }
282     }
283
284     private void sendErrorMessage(final PCEPErrors value) {
285         this.sendErrorMessage(value, null);
286     }
287
288     /**
289      * Sends PCEP Error Message with one PCEPError and Open Object.
290      *
291      * @param value PCEP errors value
292      * @param open Open Object
293      */
294     private void sendErrorMessage(final PCEPErrors value, final Open open) {
295         this.sendMessage(Util.createErrorMessage(value, open));
296     }
297
298     /**
299      * The fact, that a message is malformed, comes from parser. In case of unrecognized message a particular error is
300      * sent (CAPABILITY_NOT_SUPPORTED) and the method checks if the MAX_UNKNOWN_MSG per minute wasn't overstepped.
301      * Second, any other error occurred that is specified by rfc. In this case, the an error message is generated and
302      * sent.
303      *
304      * @param error documented error in RFC5440 or draft
305      */
306     @VisibleForTesting
307     void handleMalformedMessage(final PCEPErrors error) {
308         final long ct = TICKER.read();
309         this.sendErrorMessage(error);
310         if (error == PCEPErrors.CAPABILITY_NOT_SUPPORTED) {
311             this.unknownMessagesTimes.add(ct);
312             while (ct - this.unknownMessagesTimes.peek() > MINUTE) {
313                 this.unknownMessagesTimes.remove();
314             }
315             if (this.unknownMessagesTimes.size() > this.maxUnknownMessages) {
316                 this.terminate(TerminationReason.TOO_MANY_UNKNOWN_MSGS);
317             }
318         }
319     }
320
321     /**
322      * Handles incoming message. If the session is up, it notifies the user. The user is notified about every message
323      * except KeepAlive.
324      *
325      * @param msg incoming message
326      */
327     public synchronized void handleMessage(final Message msg) {
328         if (this.closed.get()) {
329             LOG.debug("PCEP Session {} is already closed, skip handling incoming message {}", this, msg);
330             return;
331         }
332         // Update last reception time
333         this.lastMessageReceivedAt = TICKER.read();
334         this.sessionState.updateLastReceivedMsg();
335         if (!(msg instanceof KeepaliveMessage)) {
336             LOG.debug("PCEP message {} received.", msg);
337         }
338         // Internal message handling. The user does not see these messages
339         if (msg instanceof KeepaliveMessage) {
340             // Do nothing, the timer has been already reset
341         } else if (msg instanceof OpenMessage) {
342             this.sendErrorMessage(PCEPErrors.ATTEMPT_2ND_SESSION);
343         } else if (msg instanceof CloseMessage) {
344             /*
345              * Session is up, we are reporting all messages to user. One notable
346              * exception is CLOSE message, which needs to be converted into a
347              * session DOWN event.
348              */
349             close();
350             this.listener.onSessionTerminated(this, new PCEPCloseTermination(TerminationReason
351                     .forValue(((CloseMessage) msg).getCCloseMessage().getCClose().getReason().toJava())));
352         } else {
353             // This message needs to be handled by the user
354             if (msg instanceof PcerrMessage) {
355                 this.sessionState.setLastReceivedError(msg);
356             }
357             this.listener.onMessage(this, msg);
358         }
359     }
360
361     @Override
362     public final String toString() {
363         return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
364     }
365
366     private ToStringHelper addToStringAttributes(final ToStringHelper toStringHelper) {
367         toStringHelper.add("channel", this.channel);
368         toStringHelper.add("localOpen", this.localOpen);
369         toStringHelper.add("remoteOpen", this.remoteOpen);
370         return toStringHelper;
371     }
372
373     @VisibleForTesting
374     @SuppressWarnings("checkstyle:IllegalCatch")
375     void sessionUp() {
376         try {
377             this.listener.onSessionUp(this);
378         } catch (final RuntimeException e) {
379             handleException(e);
380             throw e;
381         }
382     }
383
384     @VisibleForTesting
385     final Queue<Long> getUnknownMessagesTimes() {
386         return this.unknownMessagesTimes;
387     }
388
389     @Override
390     public Messages getMessages() {
391         return this.sessionState.getMessages(this.unknownMessagesTimes.size());
392     }
393
394     @Override
395     public LocalPref getLocalPref() {
396         return this.sessionState.getLocalPref();
397     }
398
399     @Override
400     public PeerPref getPeerPref() {
401         return this.sessionState.getPeerPref();
402     }
403
404     @Override
405     public Open getLocalOpen() {
406         return this.sessionState.getLocalOpen();
407     }
408
409     @Override
410     //similar to bgp/rib-impl/src/main/java/org/opendaylight/protocol/bgp/rib/impl/BGPSessionImpl.java
411     public final synchronized void channelInactive(final ChannelHandlerContext ctx) throws Exception {
412         LOG.debug("Channel {} inactive.", ctx.channel());
413         endOfInput();
414         super.channelInactive(ctx);
415     }
416
417     @Override
418     protected final synchronized void channelRead0(final ChannelHandlerContext ctx, final Message msg) {
419         LOG.debug("Message was received: {}", msg);
420         handleMessage(msg);
421     }
422
423     @Override
424     public final synchronized void handlerAdded(final ChannelHandlerContext ctx) {
425         this.sessionUp();
426     }
427
428     @Override
429     public synchronized void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
430         handleException(cause);
431     }
432
433     @Override
434     public Tlvs getLocalTlvs() {
435         return this.localOpen.getTlvs();
436     }
437
438     @VisibleForTesting
439     static void setTicker(final Ticker ticker) {
440         TICKER = ticker;
441     }
442 }