Sonar: Exception handler should preserve exception
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
1 /*
2  * Copyright (c) 2014 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lispflowmapping.southbound;
10
11 import java.io.IOException;
12 import java.net.DatagramPacket;
13 import java.net.DatagramSocket;
14 import java.net.InetAddress;
15 import java.net.InetSocketAddress;
16 import java.net.SocketException;
17 import java.net.SocketTimeoutException;
18 import java.nio.ByteBuffer;
19
20 import org.apache.commons.lang3.exception.ExceptionUtils;
21 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
22 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
23 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
24 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
25 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
26 import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
27 import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService;
28 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
29 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService;
30 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.sb.rev150904.OdlLispSbService;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.common.net.InetAddresses;
38
39 public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, BindingAwareProvider {
40     protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
41
42     private static Object startLock = new Object();
43     private LispIoThread lispThread;
44     private LispIoThread xtrThread;
45     private LispSouthboundService lispSouthboundService;
46     private LispXtrSouthboundService lispXtrSouthboundService;
47     private NotificationPublishService notificationPublishService;
48     private RpcProviderRegistry rpcRegistry;
49     private BindingAwareBroker broker;
50     private volatile DatagramSocket socket = null;
51     private volatile String bindingAddress = null;
52     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
53     private volatile boolean listenOnXtrPort = false;
54     private BindingAwareBroker.RpcRegistration<OdlLispSbService> sbRpcRegistration;
55     private DatagramSocket xtrSocket;
56     private LispSouthboundStats statistics = new LispSouthboundStats();
57
58     public void init() {
59         LOG.info("LISP (RFC6830) southbound plugin is initializing...");
60         final LispSouthboundRPC sbRpcHandler = new LispSouthboundRPC(this);
61
62         sbRpcRegistration = rpcRegistry.addRpcImplementation(OdlLispSbService.class, sbRpcHandler);
63         broker.registerProvider(this);
64
65         synchronized (startLock) {
66             lispSouthboundService = new LispSouthboundService(this);
67             lispXtrSouthboundService = new LispXtrSouthboundService();
68             lispSouthboundService.setNotificationProvider(this.notificationPublishService);
69             lispXtrSouthboundService.setNotificationProvider(this.notificationPublishService);
70             if (bindingAddress == null) {
71                 setLispAddress("0.0.0.0");
72             }
73             LOG.info("LISP (RFC6830) southbound plugin is up!");
74         }
75     }
76
77     public void setNotificationPublishService(NotificationPublishService notificationService) {
78         this.notificationPublishService = notificationService;
79     }
80
81     public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) {
82         this.rpcRegistry = rpcRegistry;
83     }
84
85     public void setBindingAwareBroker(BindingAwareBroker broker) {
86         this.broker = broker;
87     }
88
89     private void unloadActions() {
90         if (lispThread != null) {
91             lispThread.stopRunning();
92         }
93         lispSouthboundService = null;
94         lispXtrSouthboundService = null;
95         lispThread = null;
96         xtrThread = null;
97         bindingAddress = null;
98         LOG.info("LISP (RFC6830) southbound plugin is down!");
99         try {
100             Thread.sleep(1100);
101         } catch (InterruptedException e) {
102         }
103     }
104
105     private class LispIoThread extends Thread {
106         private volatile boolean shouldRun;
107         private volatile DatagramSocket threadSocket = null;
108         private volatile ILispSouthboundService service;
109         private volatile boolean running;
110
111         public LispIoThread(DatagramSocket socket, ILispSouthboundService service) {
112             super("Lisp Thread");
113             this.threadSocket = socket;
114             this.service = service;
115             shouldRun = true;
116         }
117
118         @Override
119         public void run() {
120             running = true;
121
122             int lispReceiveTimeout = 1000;
123
124             LOG.info("LISP (RFC6830) southbound plugin is running and listening on address: " + bindingAddress
125                     + " port: " + threadSocket.getLocalPort());
126             try {
127
128                 threadSocket.setSoTimeout(lispReceiveTimeout);
129             } catch (SocketException e) {
130                 LOG.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e);
131                 return;
132             }
133
134             while (shouldRun) {
135                 byte[] buffer = new byte[4096];
136                 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
137                 try {
138                     threadSocket.receive(packet);
139                     LOG.trace("Received a packet!");
140                 } catch (SocketTimeoutException ste) {
141                     LOG.trace("Timed out waiting on socket", ste);
142                     continue;
143                 } catch (IOException e) {
144                     LOG.warn("IO Exception while trying to recieve packet", e);
145                 }
146                 LOG.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress()
147                         .getHostAddress(), packet.getPort(), packet.getLength()));
148
149                 try {
150                     this.service.handlePacket(packet);
151                 } catch (Exception e) {
152                     LOG.warn("Error while handling packet", e);
153                 }
154             }
155
156             threadSocket.close();
157             LOG.trace("Socket closed");
158             running = false;
159         }
160
161         public void stopRunning() {
162             shouldRun = false;
163         }
164
165         public boolean isRunning() {
166             return running;
167         }
168     }
169
170     public static String intToIpv4(int address) {
171         return ((address >> 24) & 0xff) + "." + //
172                 ((address >> 16) & 0xff) + "." + //
173                 ((address >> 8) & 0xff) + "." + //
174                 ((address >> 0) & 0xff);
175     }
176
177     private void startIOThread() {
178         if (socket != null) {
179             while (!socket.isClosed()) {
180                 try {
181                     Thread.sleep(500);
182                 } catch (InterruptedException e) {
183                 }
184             }
185         }
186         try {
187             socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM));
188             lispThread = new LispIoThread(socket, lispSouthboundService);
189             lispThread.start();
190             LOG.info("LISP (RFC6830) southbound plugin is listening for control packets!");
191             if (listenOnXtrPort) {
192                 restartXtrThread();
193             }
194         } catch (SocketException e) {
195             LOG.error("couldn't start socket: {}", ExceptionUtils.getStackTrace(e));
196         }
197     }
198
199     private void restartXtrThread() {
200         try {
201             stopXtrThread();
202             xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort));
203             xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService);
204             xtrThread.start();
205             LOG.info("xTR southbound plugin is up!");
206         } catch (SocketException e) {
207             LOG.warn("failed to start xtr thread: {}", ExceptionUtils.getStackTrace(e));
208         }
209     }
210
211     public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, MessageType packetType) {
212         DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit());
213         packet.setPort(address.getPort().getValue());
214         InetAddress ip = InetAddresses.forString(new String(address.getIpAddress().getValue()));
215         packet.setAddress(ip);
216         try {
217             if (LOG.isDebugEnabled()) {
218                 LOG.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip);
219             }
220             socket.send(packet);
221             this.statistics.incrementTx(packetType.getIntValue());
222         } catch (IOException e) {
223             LOG.warn("Failed to send " + packetType, e);
224             this.statistics.incrementTxErrors();
225         }
226     }
227
228     public LispSouthboundStats getStats() {
229         return statistics;
230     }
231
232     public void setLispAddress(String address) {
233         synchronized (startLock) {
234             if (bindingAddress != null && bindingAddress.equals(address)) {
235                 LOG.trace("configured lisp binding address didn't change.");
236             } else {
237                 String action = (bindingAddress == null ? "Setting" : "Resetting");
238                 LOG.trace(action + " lisp binding address to: " + address);
239                 bindingAddress = address;
240                 if (lispThread != null) {
241                     lispThread.stopRunning();
242                     while (lispThread.isRunning()) {
243                         try {
244                             Thread.sleep(500);
245                         } catch (InterruptedException e) {
246                         }
247                     }
248                 }
249                 stopXtrThread();
250                 startIOThread();
251             }
252         }
253     }
254
255     private void stopXtrThread() {
256         if (xtrThread != null) {
257             xtrThread.stopRunning();
258             while (xtrThread.isRunning()) {
259                 try {
260                     Thread.sleep(500);
261                 } catch (InterruptedException e) {
262                 }
263             }
264         }
265     }
266
267     @Override
268     public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
269         listenOnXtrPort = shouldListenOnXtrPort;
270         if (listenOnXtrPort) {
271             LOG.debug("restarting xtr thread");
272             restartXtrThread();
273         } else {
274             LOG.debug("terminating thread");
275             stopXtrThread();
276         }
277     }
278
279     @Override
280     public void setXtrPort(int port) {
281         this.xtrPort = port;
282         if (listenOnXtrPort) {
283             restartXtrThread();
284         }
285     }
286
287     @Override
288     public void close() throws Exception {
289         unloadActions();
290         sbRpcRegistration.close();
291     }
292
293     @Override
294     public void onSessionInitiated(ProviderContext session) {
295         LOG.debug("LispSouthboundPlugin Provider Session Initiated");
296     }
297 }