fixed lisp notification handler and added logs
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
1 /*
2  * Copyright (c) 2014 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lispflowmapping.southbound;
10
11 import java.io.IOException;
12 import java.net.DatagramPacket;
13 import java.net.DatagramSocket;
14 import java.net.InetAddress;
15 import java.net.InetSocketAddress;
16 import java.net.SocketException;
17 import java.net.SocketTimeoutException;
18 import java.nio.ByteBuffer;
19 import java.util.concurrent.Future;
20
21 import org.eclipse.osgi.framework.console.CommandProvider;
22 import org.opendaylight.controller.sal.binding.api.AbstractBindingAwareProvider;
23 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
24 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
25 import org.opendaylight.lispflowmapping.implementation.serializer.LispMessage;
26 import org.opendaylight.lispflowmapping.implementation.serializer.MapNotifySerializer;
27 import org.opendaylight.lispflowmapping.implementation.serializer.MapReplySerializer;
28 import org.opendaylight.lispflowmapping.implementation.serializer.MapRequestSerializer;
29 import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService;
30 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
31 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService;
32 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispPlugin;
33 import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.LispflowmappingService;
34 import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.SendMapNotifyInput;
35 import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.SendMapReplyInput;
36 import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.SendMapRequestInput;
37 import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.SetXtrPortInput;
38 import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.ShouldListenOnXtrPortInput;
39 import org.opendaylight.yang.gen.v1.lispflowmapping.rev131031.transportaddress.TransportAddress;
40 import org.opendaylight.yangtools.yang.common.RpcResult;
41 import org.osgi.framework.BundleContext;
42 import org.osgi.framework.FrameworkUtil;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 import com.google.common.net.InetAddresses;
47
48 public class LispSouthboundPlugin extends AbstractBindingAwareProvider implements IConfigLispPlugin, CommandProvider, LispflowmappingService {
49     protected static final Logger logger = LoggerFactory.getLogger(LispSouthboundPlugin.class);
50
51     private static Object startLock = new Object();
52     private LispIoThread lispThread;
53     private LispIoThread xtrThread;
54     private LispSouthboundService lispSouthboundService;
55     private LispXtrSouthboundService lispXtrSouthboundService;
56     private volatile DatagramSocket socket = null;
57     private final String MAP_NOTIFY = "MapNotify";
58     private final String MAP_REPlY = "MapReply";
59     private final String MAP_REQUEST = "MapRequest";
60     private volatile String bindingAddress = null;
61     private volatile boolean alreadyInit = false;
62     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
63     private volatile boolean listenOnXtrPort = false;
64
65     private DatagramSocket xtrSocket;
66
67     private void registerWithOSGIConsole() {
68         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass()).getBundleContext();
69         bundleContext.registerService(CommandProvider.class.getName(), this, null);
70         bundleContext.registerService(IConfigLispPlugin.class.getName(), this, null);
71     }
72
73     protected void stopImpl(BundleContext context) {
74         unloadActions();
75     }
76
77     private void unloadActions() {
78         if (lispThread != null) {
79             lispThread.stopRunning();
80         }
81         lispSouthboundService = null;
82         lispXtrSouthboundService = null;
83         lispThread = null;
84         xtrThread = null;
85         logger.info("LISP (RFC6830) Mapping Service is down!");
86         try {
87             Thread.sleep(1100);
88         } catch (InterruptedException e) {
89         }
90     }
91
92     public void destroy() {
93         unloadActions();
94     }
95
96     private class LispIoThread extends Thread {
97         private volatile boolean shouldRun;
98         private volatile DatagramSocket threadSocket = null;
99         private volatile ILispSouthboundService service;
100         private volatile boolean running;
101
102         public LispIoThread(DatagramSocket socket, ILispSouthboundService service) {
103             super("Lisp Thread");
104             this.threadSocket = socket;
105             this.service = service;
106             shouldRun = true;
107         }
108
109         @Override
110         public void run() {
111             running = true;
112
113             int lispReceiveTimeout = 1000;
114
115             logger.info("LISP (RFC6830) Mapping Service is running and listening on address: " + bindingAddress + " port: "
116                     + threadSocket.getLocalPort());
117             try {
118
119                 threadSocket.setSoTimeout(lispReceiveTimeout);
120             } catch (SocketException e) {
121                 logger.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e);
122                 return;
123             }
124
125             while (shouldRun) {
126                 byte[] buffer = new byte[4096];
127                 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
128                 try {
129                     threadSocket.receive(packet);
130                     logger.trace("Received a packet!");
131                 } catch (SocketTimeoutException ste) {
132                     continue;
133                 } catch (IOException e) {
134                     logger.warn("IO Exception while trying to recieve packet", e);
135                 }
136                 logger.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress().getHostAddress(), packet.getPort(),
137                         packet.getLength()));
138
139                 try {
140                     this.service.handlePacket(packet);
141                 } catch (Throwable t) {
142                     logger.warn("Error while handling packet", t);
143                 }
144             }
145
146             threadSocket.close();
147             logger.trace("Socket closed");
148             running = false;
149         }
150
151         public void stopRunning() {
152             shouldRun = false;
153         }
154
155         public boolean isRunning() {
156             return running;
157         }
158     }
159
160     public static String intToIpv4(int address) {
161         return ((address >> 24) & 0xff) + "." + //
162                 ((address >> 16) & 0xff) + "." + //
163                 ((address >> 8) & 0xff) + "." + //
164                 ((address >> 0) & 0xff);
165     }
166
167     public String getHelp() {
168         StringBuffer help = new StringBuffer();
169         help.append("---LISP Southbound Plugin---\n");
170         return help.toString();
171     }
172
173     private void startIOThread() {
174         try {
175             socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM));
176             lispThread = new LispIoThread(socket, lispSouthboundService);
177             lispThread.start();
178             logger.info("LISP (RFC6830) Mapping Service Southbound Plugin is up!");
179             if (listenOnXtrPort) {
180                 restartXtrThread();
181             }
182         } catch (SocketException e) {
183             logger.error("couldn't start socket {}", e.getMessage());
184             e.printStackTrace();
185         }
186     }
187
188     private void restartXtrThread() {
189         try {
190             stopXtrThread();
191             xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort));
192             xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService);
193             xtrThread.start();
194             logger.info("xTR Southbound Plugin is up!");
195         } catch (SocketException e) {
196             logger.warn("failed to start xtr thread: {}", e.getMessage());
197         }
198     }
199
200     public void onSessionInitiated(ProviderContext session) {
201         logger.info("LISP (RFC6830) Mapping Service is up!");
202         synchronized (startLock) {
203             if (!alreadyInit) {
204                 alreadyInit = true;
205                 lispSouthboundService = new LispSouthboundService();
206                 lispXtrSouthboundService = new LispXtrSouthboundService();
207                 registerWithOSGIConsole();
208                 registerRPCs(session);
209                 logger.trace("Provider Session initialized");
210                 if (bindingAddress == null) {
211                     setLispAddress("0.0.0.0");
212                 }
213             }
214
215         }
216     }
217
218     private void registerRPCs(ProviderContext session) {
219         try {
220             lispSouthboundService.setNotificationProvider(session.getSALService(NotificationProviderService.class));
221             lispXtrSouthboundService.setNotificationProvider(session.getSALService(NotificationProviderService.class));
222             session.addRpcImplementation(LispflowmappingService.class, this);
223         } catch (Throwable t) {
224             logger.error(t.getMessage(), t);
225         }
226     }
227
228     private void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, String packetType) {
229         DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit());
230         packet.setPort(address.getPort().getValue());
231         InetAddress ip = InetAddresses.forString(address.getIpAddress().getIpv4Address().getValue());
232         packet.setAddress(ip);
233         try {
234             if (logger.isDebugEnabled()) {
235                 logger.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip);
236             }
237             socket.send(packet);
238         } catch (IOException e) {
239             logger.warn("Failed to send " + packetType, e);
240         }
241     }
242
243     public void setLispAddress(String address) {
244         synchronized (startLock) {
245             if (bindingAddress != null && bindingAddress.equals(address)) {
246                 logger.trace("configured lisp binding address didn't change.");
247             } else {
248                 String action = (bindingAddress == null ? "Setting" : "Resetting");
249                 logger.trace(action + " lisp binding address to: " + address);
250                 bindingAddress = address;
251                 if (lispThread != null) {
252                     lispThread.stopRunning();
253                     while (lispThread.isRunning()) {
254                         try {
255                             Thread.sleep(500);
256                         } catch (InterruptedException e) {
257                         }
258                     }
259                 }
260                 stopXtrThread();
261                 startIOThread();
262             }
263         }
264     }
265
266     private void stopXtrThread() {
267         if (xtrThread != null) {
268             xtrThread.stopRunning();
269             while (xtrThread.isRunning()) {
270                 try {
271                     Thread.sleep(500);
272                 } catch (InterruptedException e) {
273                 }
274             }
275         }
276     }
277
278     @Override
279     public Future<RpcResult<Void>> sendMapNotify(SendMapNotifyInput mapNotifyInput) {
280         logger.trace("sendMapNotify called!!");
281         if (mapNotifyInput != null) {
282             ByteBuffer outBuffer = MapNotifySerializer.getInstance().serialize(mapNotifyInput.getMapNotify());
283             handleSerializedLispBuffer(mapNotifyInput.getTransportAddress(), outBuffer, MAP_NOTIFY);
284         } else {
285             logger.warn("MapNotify was null");
286         }
287         return null;
288     }
289
290     @Override
291     public Future<RpcResult<Void>> sendMapReply(SendMapReplyInput mapReplyInput) {
292         logger.trace("sendMapReply called!!");
293         if (mapReplyInput != null) {
294             ByteBuffer outBuffer = MapReplySerializer.getInstance().serialize(mapReplyInput.getMapReply());
295             handleSerializedLispBuffer(mapReplyInput.getTransportAddress(), outBuffer, MAP_REPlY);
296         } else {
297             logger.warn("MapReply was null");
298         }
299         return null;
300     }
301
302     @Override
303     public Future<RpcResult<Void>> sendMapRequest(SendMapRequestInput mapRequestInput) {
304         logger.trace("sendMapRequest called!!");
305         if (mapRequestInput != null) {
306             ByteBuffer outBuffer = MapRequestSerializer.getInstance().serialize(mapRequestInput.getMapRequest());
307             handleSerializedLispBuffer(mapRequestInput.getTransportAddress(), outBuffer, MAP_REQUEST);
308         } else {
309             logger.debug("MapRequest was null");
310         }
311         return null;
312     }
313
314     @Override
315     public Future<RpcResult<Void>> shouldListenOnXtrPort(ShouldListenOnXtrPortInput input) {
316         logger.debug("got a call to shouldListenOnXtrPort");
317         if (listenOnXtrPort == input.isShouldListenOnXtrPort()) {
318             logger.debug("no action done, ond and new value are identical, " + listenOnXtrPort); 
319             return null;
320         }
321         listenOnXtrPort = input.isShouldListenOnXtrPort();
322         if (listenOnXtrPort) {
323             logger.debug("restarting xtr thread");
324             restartXtrThread();
325         } else {
326             logger.debug("terminating thread");
327             stopXtrThread();
328         }
329         return null;
330     }
331
332     @Override
333     public Future<RpcResult<Void>> setXtrPort(SetXtrPortInput input) {
334         this.xtrPort = input.getXtrPort().getValue();
335         if (listenOnXtrPort) {
336             restartXtrThread();
337         }
338         return null;
339     }
340 }