Merge "Fix southbound start/stop sequence"
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
1 /*
2  * Copyright (c) 2014 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lispflowmapping.southbound;
10
11 import java.io.IOException;
12 import java.net.DatagramPacket;
13 import java.net.DatagramSocket;
14 import java.net.InetAddress;
15 import java.net.InetSocketAddress;
16 import java.net.SocketException;
17 import java.net.SocketTimeoutException;
18 import java.nio.ByteBuffer;
19 import java.util.concurrent.Future;
20
21 import org.eclipse.osgi.framework.console.CommandProvider;
22 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
23 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
24 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
25 import org.opendaylight.lispflowmapping.implementation.serializer.LispMessage;
26 import org.opendaylight.lispflowmapping.implementation.serializer.MapNotifySerializer;
27 import org.opendaylight.lispflowmapping.implementation.serializer.MapReplySerializer;
28 import org.opendaylight.lispflowmapping.implementation.serializer.MapRequestSerializer;
29 import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService;
30 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
31 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService;
32 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.LfmControlPlaneService;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.SendMapNotifyInput;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.SendMapReplyInput;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.SendMapRequestInput;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.transportaddress.TransportAddress;
38 import org.opendaylight.yangtools.yang.common.RpcResult;
39 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
40 import org.osgi.framework.BundleContext;
41 import org.osgi.framework.FrameworkUtil;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import com.google.common.net.InetAddresses;
46 import com.google.common.util.concurrent.Futures;
47
48 public class LispSouthboundPlugin extends AbstractBindingAwareProvider implements IConfigLispSouthboundPlugin, CommandProvider, LfmControlPlaneService {
49     protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
50
51     private static Object startLock = new Object();
52     private LispIoThread lispThread;
53     private LispIoThread xtrThread;
54     private LispSouthboundService lispSouthboundService;
55     private LispXtrSouthboundService lispXtrSouthboundService;
56     private volatile DatagramSocket socket = null;
57     private final String MAP_NOTIFY = "MapNotify";
58     private final String MAP_REPlY = "MapReply";
59     private final String MAP_REQUEST = "MapRequest";
60     private volatile String bindingAddress = null;
61     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
62     private volatile boolean listenOnXtrPort = false;
63
64     private DatagramSocket xtrSocket;
65
66     private void registerWithOSGIConsole() {
67         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
68         bundleContext.registerService(CommandProvider.class.getName(), this, null);
69         bundleContext.registerService(IConfigLispSouthboundPlugin.class.getName(), this, null);
70     }
71
72     protected void stopImpl(BundleContext context) {
73         unloadActions();
74     }
75
76     private void unloadActions() {
77         if (lispThread != null) {
78             lispThread.stopRunning();
79         }
80         lispSouthboundService = null;
81         lispXtrSouthboundService = null;
82         lispThread = null;
83         xtrThread = null;
84         bindingAddress = null;
85         LOG.info("LISP (RFC6830) Mapping Service is down!");
86         try {
87             Thread.sleep(1100);
88         } catch (InterruptedException e) {
89         }
90     }
91
92     public void destroy() {
93         unloadActions();
94     }
95
96     private class LispIoThread extends Thread {
97         private volatile boolean shouldRun;
98         private volatile DatagramSocket threadSocket = null;
99         private volatile ILispSouthboundService service;
100         private volatile boolean running;
101
102         public LispIoThread(DatagramSocket socket, ILispSouthboundService service) {
103             super("Lisp Thread");
104             this.threadSocket = socket;
105             this.service = service;
106             shouldRun = true;
107         }
108
109         @Override
110         public void run() {
111             running = true;
112
113             int lispReceiveTimeout = 1000;
114
115             LOG.info("LISP (RFC6830) Mapping Service is running and listening on address: " + bindingAddress + " port: "
116                     + threadSocket.getLocalPort());
117             try {
118
119                 threadSocket.setSoTimeout(lispReceiveTimeout);
120             } catch (SocketException e) {
121                 LOG.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e);
122                 return;
123             }
124
125             while (shouldRun) {
126                 byte[] buffer = new byte[4096];
127                 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
128                 try {
129                     threadSocket.receive(packet);
130                     LOG.trace("Received a packet!");
131                 } catch (SocketTimeoutException ste) {
132                     continue;
133                 } catch (IOException e) {
134                     LOG.warn("IO Exception while trying to recieve packet", e);
135                 }
136                 LOG.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress().getHostAddress(), packet.getPort(),
137                         packet.getLength()));
138
139                 try {
140                     this.service.handlePacket(packet);
141                 } catch (Exception e) {
142                     LOG.warn("Error while handling packet", e);
143                 }
144             }
145
146             threadSocket.close();
147             LOG.trace("Socket closed");
148             running = false;
149         }
150
151         public void stopRunning() {
152             shouldRun = false;
153         }
154
155         public boolean isRunning() {
156             return running;
157         }
158     }
159
160     public static String intToIpv4(int address) {
161         return ((address >> 24) & 0xff) + "." + //
162                 ((address >> 16) & 0xff) + "." + //
163                 ((address >> 8) & 0xff) + "." + //
164                 ((address >> 0) & 0xff);
165     }
166
167     public String getHelp() {
168         StringBuffer help = new StringBuffer();
169         help.append("---LISP Southbound Plugin---\n");
170         return help.toString();
171     }
172
173     private void startIOThread() {
174         if (socket != null) {
175             while (!socket.isClosed()) {
176                 try {
177                     Thread.sleep(500);
178                 } catch (InterruptedException e) {
179                 }
180             }
181         }
182         try {
183             socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM));
184             lispThread = new LispIoThread(socket, lispSouthboundService);
185             lispThread.start();
186             LOG.info("LISP (RFC6830) Mapping Service Southbound Plugin is up!");
187             if (listenOnXtrPort) {
188                 restartXtrThread();
189             }
190         } catch (SocketException e) {
191             LOG.error("couldn't start socket {}", e.getMessage());
192         }
193     }
194
195     private void restartXtrThread() {
196         try {
197             stopXtrThread();
198             xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort));
199             xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService);
200             xtrThread.start();
201             LOG.info("xTR Southbound Plugin is up!");
202         } catch (SocketException e) {
203             LOG.warn("failed to start xtr thread: {}", e.getMessage());
204         }
205     }
206
207     public void onSessionInitiated(ProviderContext session) {
208         synchronized (startLock) {
209             lispSouthboundService = new LispSouthboundService();
210             lispXtrSouthboundService = new LispXtrSouthboundService();
211             registerWithOSGIConsole();
212             registerRPCs(session);
213             LOG.trace("Provider Session initialized");
214             if (bindingAddress == null) {
215                 setLispAddress("0.0.0.0");
216             }
217             LOG.info("LISP (RFC6830) Mapping Service is up!");
218         }
219     }
220
221     private void registerRPCs(ProviderContext session) {
222         try {
223             lispSouthboundService.setNotificationProvider(session.getSALService(NotificationProviderService.class));
224             lispXtrSouthboundService.setNotificationProvider(session.getSALService(NotificationProviderService.class));
225             session.addRpcImplementation(LfmControlPlaneService.class, this);
226         } catch (Exception e) {
227             LOG.error(e.getMessage(), e);
228         }
229     }
230
231     private void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, String packetType) {
232         DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit());
233         packet.setPort(address.getPort().getValue());
234         InetAddress ip = InetAddresses.forString(address.getIpAddress().getIpv4Address().getValue());
235         packet.setAddress(ip);
236         try {
237             if (LOG.isDebugEnabled()) {
238                 LOG.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip);
239             }
240             socket.send(packet);
241         } catch (IOException e) {
242             LOG.warn("Failed to send " + packetType, e);
243         }
244     }
245
246     public void setLispAddress(String address) {
247         synchronized (startLock) {
248             if (bindingAddress != null && bindingAddress.equals(address)) {
249                 LOG.trace("configured lisp binding address didn't change.");
250             } else {
251                 String action = (bindingAddress == null ? "Setting" : "Resetting");
252                 LOG.trace(action + " lisp binding address to: " + address);
253                 bindingAddress = address;
254                 if (lispThread != null) {
255                     lispThread.stopRunning();
256                     while (lispThread.isRunning()) {
257                         try {
258                             Thread.sleep(500);
259                         } catch (InterruptedException e) {
260                         }
261                     }
262                 }
263                 stopXtrThread();
264                 startIOThread();
265             }
266         }
267     }
268
269     private void stopXtrThread() {
270         if (xtrThread != null) {
271             xtrThread.stopRunning();
272             while (xtrThread.isRunning()) {
273                 try {
274                     Thread.sleep(500);
275                 } catch (InterruptedException e) {
276                 }
277             }
278         }
279     }
280
281     @Override
282     public Future<RpcResult<Void>> sendMapNotify(SendMapNotifyInput mapNotifyInput) {
283         LOG.trace("sendMapNotify called!!");
284         if (mapNotifyInput != null) {
285             ByteBuffer outBuffer = MapNotifySerializer.getInstance().serialize(mapNotifyInput.getMapNotify());
286             handleSerializedLispBuffer(mapNotifyInput.getTransportAddress(), outBuffer, MAP_NOTIFY);
287         } else {
288             LOG.warn("MapNotify was null");
289             return Futures.immediateFuture(RpcResultBuilder.<Void> failed().build());
290         }
291         return Futures.immediateFuture(RpcResultBuilder.<Void> success().build());
292     }
293
294     @Override
295     public Future<RpcResult<Void>> sendMapReply(SendMapReplyInput mapReplyInput) {
296         LOG.trace("sendMapReply called!!");
297         if (mapReplyInput != null) {
298             ByteBuffer outBuffer = MapReplySerializer.getInstance().serialize(mapReplyInput.getMapReply());
299             handleSerializedLispBuffer(mapReplyInput.getTransportAddress(), outBuffer, MAP_REPlY);
300         } else {
301             LOG.warn("MapReply was null");
302             return Futures.immediateFuture(RpcResultBuilder.<Void> failed().build());
303         }
304         return Futures.immediateFuture(RpcResultBuilder.<Void> success().build());
305     }
306
307     @Override
308     public Future<RpcResult<Void>> sendMapRequest(SendMapRequestInput mapRequestInput) {
309         LOG.trace("sendMapRequest called!!");
310         if (mapRequestInput != null) {
311             ByteBuffer outBuffer = MapRequestSerializer.getInstance().serialize(mapRequestInput.getMapRequest());
312             handleSerializedLispBuffer(mapRequestInput.getTransportAddress(), outBuffer, MAP_REQUEST);
313         } else {
314             LOG.debug("MapRequest was null");
315             return Futures.immediateFuture(RpcResultBuilder.<Void> failed().build());
316         }
317         return Futures.immediateFuture(RpcResultBuilder.<Void> success().build());
318     }
319
320     @Override
321     public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
322         listenOnXtrPort = shouldListenOnXtrPort;
323         if (listenOnXtrPort) {
324             LOG.debug("restarting xtr thread");
325             restartXtrThread();
326         } else {
327             LOG.debug("terminating thread");
328             stopXtrThread();
329         }
330     }
331
332     @Override
333     public void setXtrPort(int port) {
334         this.xtrPort = port;
335         if (listenOnXtrPort) {
336             restartXtrThread();
337         }
338     }
339 }