Merge "Enforce basic checkstyle on mappingservice.*"
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
1 /*
2  * Copyright (c) 2014 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lispflowmapping.southbound;
10
11 import java.io.IOException;
12 import java.net.DatagramPacket;
13 import java.net.DatagramSocket;
14 import java.net.InetAddress;
15 import java.net.InetSocketAddress;
16 import java.net.SocketException;
17 import java.net.SocketTimeoutException;
18 import java.nio.ByteBuffer;
19 import java.util.concurrent.Future;
20
21 import org.eclipse.osgi.framework.console.CommandProvider;
22 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
23 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
24 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
25 import org.opendaylight.lispflowmapping.implementation.serializer.LispMessage;
26 import org.opendaylight.lispflowmapping.implementation.serializer.MapNotifySerializer;
27 import org.opendaylight.lispflowmapping.implementation.serializer.MapReplySerializer;
28 import org.opendaylight.lispflowmapping.implementation.serializer.MapRequestSerializer;
29 import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService;
30 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
31 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService;
32 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.LfmControlPlaneService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.SendMapNotifyInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.SendMapReplyInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.SendMapRequestInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.transportaddress.TransportAddress;
38 import org.opendaylight.yangtools.yang.common.RpcResult;
39 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
40 import org.osgi.framework.BundleContext;
41 import org.osgi.framework.FrameworkUtil;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import com.google.common.net.InetAddresses;
46 import com.google.common.util.concurrent.Futures;
47
48 public class LispSouthboundPlugin extends AbstractBindingAwareProvider implements IConfigLispSouthboundPlugin, CommandProvider, LfmControlPlaneService {
49     protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
50
51     private static Object startLock = new Object();
52     private LispIoThread lispThread;
53     private LispIoThread xtrThread;
54     private LispSouthboundService lispSouthboundService;
55     private LispXtrSouthboundService lispXtrSouthboundService;
56     private volatile DatagramSocket socket = null;
57     private final String MAP_NOTIFY = "MapNotify";
58     private final String MAP_REPlY = "MapReply";
59     private final String MAP_REQUEST = "MapRequest";
60     private volatile String bindingAddress = null;
61     private volatile boolean alreadyInit = false;
62     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
63     private volatile boolean listenOnXtrPort = false;
64
65     private DatagramSocket xtrSocket;
66
67     private void registerWithOSGIConsole() {
68         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
69         bundleContext.registerService(CommandProvider.class.getName(), this, null);
70         bundleContext.registerService(IConfigLispSouthboundPlugin.class.getName(), this, null);
71     }
72
73     protected void stopImpl(BundleContext context) {
74         unloadActions();
75     }
76
77     private void unloadActions() {
78         if (lispThread != null) {
79             lispThread.stopRunning();
80         }
81         lispSouthboundService = null;
82         lispXtrSouthboundService = null;
83         lispThread = null;
84         xtrThread = null;
85         LOG.info("LISP (RFC6830) Mapping Service is down!");
86         try {
87             Thread.sleep(1100);
88         } catch (InterruptedException e) {
89         }
90     }
91
92     public void destroy() {
93         unloadActions();
94     }
95
96     private class LispIoThread extends Thread {
97         private volatile boolean shouldRun;
98         private volatile DatagramSocket threadSocket = null;
99         private volatile ILispSouthboundService service;
100         private volatile boolean running;
101
102         public LispIoThread(DatagramSocket socket, ILispSouthboundService service) {
103             super("Lisp Thread");
104             this.threadSocket = socket;
105             this.service = service;
106             shouldRun = true;
107         }
108
109         @Override
110         public void run() {
111             running = true;
112
113             int lispReceiveTimeout = 1000;
114
115             LOG.info("LISP (RFC6830) Mapping Service is running and listening on address: " + bindingAddress + " port: "
116                     + threadSocket.getLocalPort());
117             try {
118
119                 threadSocket.setSoTimeout(lispReceiveTimeout);
120             } catch (SocketException e) {
121                 LOG.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e);
122                 return;
123             }
124
125             while (shouldRun) {
126                 byte[] buffer = new byte[4096];
127                 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
128                 try {
129                     threadSocket.receive(packet);
130                     LOG.trace("Received a packet!");
131                 } catch (SocketTimeoutException ste) {
132                     continue;
133                 } catch (IOException e) {
134                     LOG.warn("IO Exception while trying to recieve packet", e);
135                 }
136                 LOG.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress().getHostAddress(), packet.getPort(),
137                         packet.getLength()));
138
139                 try {
140                     this.service.handlePacket(packet);
141                 } catch (Exception e) {
142                     LOG.warn("Error while handling packet", e);
143                 }
144             }
145
146             threadSocket.close();
147             LOG.trace("Socket closed");
148             running = false;
149         }
150
151         public void stopRunning() {
152             shouldRun = false;
153         }
154
155         public boolean isRunning() {
156             return running;
157         }
158     }
159
160     public static String intToIpv4(int address) {
161         return ((address >> 24) & 0xff) + "." + //
162                 ((address >> 16) & 0xff) + "." + //
163                 ((address >> 8) & 0xff) + "." + //
164                 ((address >> 0) & 0xff);
165     }
166
167     public String getHelp() {
168         StringBuffer help = new StringBuffer();
169         help.append("---LISP Southbound Plugin---\n");
170         return help.toString();
171     }
172
173     private void startIOThread() {
174         try {
175             socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM));
176             lispThread = new LispIoThread(socket, lispSouthboundService);
177             lispThread.start();
178             LOG.info("LISP (RFC6830) Mapping Service Southbound Plugin is up!");
179             if (listenOnXtrPort) {
180                 restartXtrThread();
181             }
182         } catch (SocketException e) {
183             LOG.error("couldn't start socket {}", e.getMessage());
184         }
185     }
186
187     private void restartXtrThread() {
188         try {
189             stopXtrThread();
190             xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort));
191             xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService);
192             xtrThread.start();
193             LOG.info("xTR Southbound Plugin is up!");
194         } catch (SocketException e) {
195             LOG.warn("failed to start xtr thread: {}", e.getMessage());
196         }
197     }
198
199     public void onSessionInitiated(ProviderContext session) {
200         LOG.info("LISP (RFC6830) Mapping Service is up!");
201         synchronized (startLock) {
202             if (!alreadyInit) {
203                 alreadyInit = true;
204                 lispSouthboundService = new LispSouthboundService();
205                 lispXtrSouthboundService = new LispXtrSouthboundService();
206                 registerWithOSGIConsole();
207                 registerRPCs(session);
208                 LOG.trace("Provider Session initialized");
209                 if (bindingAddress == null) {
210                     setLispAddress("0.0.0.0");
211                 }
212             }
213
214         }
215     }
216
217     private void registerRPCs(ProviderContext session) {
218         try {
219             lispSouthboundService.setNotificationProvider(session.getSALService(NotificationProviderService.class));
220             lispXtrSouthboundService.setNotificationProvider(session.getSALService(NotificationProviderService.class));
221             session.addRpcImplementation(LfmControlPlaneService.class, this);
222         } catch (Exception e) {
223             LOG.error(e.getMessage(), e);
224         }
225     }
226
227     private void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, String packetType) {
228         DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit());
229         packet.setPort(address.getPort().getValue());
230         InetAddress ip = InetAddresses.forString(address.getIpAddress().getIpv4Address().getValue());
231         packet.setAddress(ip);
232         try {
233             if (LOG.isDebugEnabled()) {
234                 LOG.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip);
235             }
236             socket.send(packet);
237         } catch (IOException e) {
238             LOG.warn("Failed to send " + packetType, e);
239         }
240     }
241
242     public void setLispAddress(String address) {
243         synchronized (startLock) {
244             if (bindingAddress != null && bindingAddress.equals(address)) {
245                 LOG.trace("configured lisp binding address didn't change.");
246             } else {
247                 String action = (bindingAddress == null ? "Setting" : "Resetting");
248                 LOG.trace(action + " lisp binding address to: " + address);
249                 bindingAddress = address;
250                 if (lispThread != null) {
251                     lispThread.stopRunning();
252                     while (lispThread.isRunning()) {
253                         try {
254                             Thread.sleep(500);
255                         } catch (InterruptedException e) {
256                         }
257                     }
258                 }
259                 stopXtrThread();
260                 startIOThread();
261             }
262         }
263     }
264
265     private void stopXtrThread() {
266         if (xtrThread != null) {
267             xtrThread.stopRunning();
268             while (xtrThread.isRunning()) {
269                 try {
270                     Thread.sleep(500);
271                 } catch (InterruptedException e) {
272                 }
273             }
274         }
275     }
276
277     @Override
278     public Future<RpcResult<Void>> sendMapNotify(SendMapNotifyInput mapNotifyInput) {
279         LOG.trace("sendMapNotify called!!");
280         if (mapNotifyInput != null) {
281             ByteBuffer outBuffer = MapNotifySerializer.getInstance().serialize(mapNotifyInput.getMapNotify());
282             handleSerializedLispBuffer(mapNotifyInput.getTransportAddress(), outBuffer, MAP_NOTIFY);
283         } else {
284             LOG.warn("MapNotify was null");
285             return Futures.immediateFuture(RpcResultBuilder.<Void> failed().build());
286         }
287         return Futures.immediateFuture(RpcResultBuilder.<Void> success().build());
288     }
289
290     @Override
291     public Future<RpcResult<Void>> sendMapReply(SendMapReplyInput mapReplyInput) {
292         LOG.trace("sendMapReply called!!");
293         if (mapReplyInput != null) {
294             ByteBuffer outBuffer = MapReplySerializer.getInstance().serialize(mapReplyInput.getMapReply());
295             handleSerializedLispBuffer(mapReplyInput.getTransportAddress(), outBuffer, MAP_REPlY);
296         } else {
297             LOG.warn("MapReply was null");
298             return Futures.immediateFuture(RpcResultBuilder.<Void> failed().build());
299         }
300         return Futures.immediateFuture(RpcResultBuilder.<Void> success().build());
301     }
302
303     @Override
304     public Future<RpcResult<Void>> sendMapRequest(SendMapRequestInput mapRequestInput) {
305         LOG.trace("sendMapRequest called!!");
306         if (mapRequestInput != null) {
307             ByteBuffer outBuffer = MapRequestSerializer.getInstance().serialize(mapRequestInput.getMapRequest());
308             handleSerializedLispBuffer(mapRequestInput.getTransportAddress(), outBuffer, MAP_REQUEST);
309         } else {
310             LOG.debug("MapRequest was null");
311             return Futures.immediateFuture(RpcResultBuilder.<Void> failed().build());
312         }
313         return Futures.immediateFuture(RpcResultBuilder.<Void> success().build());
314     }
315
316     @Override
317     public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
318         listenOnXtrPort = shouldListenOnXtrPort;
319         if (listenOnXtrPort) {
320             LOG.debug("restarting xtr thread");
321             restartXtrThread();
322         } else {
323             LOG.debug("terminating thread");
324             stopXtrThread();
325         }
326     }
327
328     @Override
329     public void setXtrPort(int port) {
330         this.xtrPort = port;
331         if (listenOnXtrPort) {
332             restartXtrThread();
333         }
334     }
335 }