0cbee80ba1e33adf46a3f5942e13d40abbd8ff76
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
1 /*
2  * Copyright (c) 2014 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lispflowmapping.southbound;
10
11 import java.io.IOException;
12 import java.net.DatagramPacket;
13 import java.net.DatagramSocket;
14 import java.net.InetAddress;
15 import java.net.InetSocketAddress;
16 import java.net.SocketException;
17 import java.net.SocketTimeoutException;
18 import java.nio.ByteBuffer;
19
20 import org.apache.commons.lang3.exception.ExceptionUtils;
21 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
22 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
23 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
24 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
25 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
26 import org.opendaylight.lispflowmapping.implementation.serializer.LispMessage;
27 import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService;
28 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
29 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService;
30 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.LfmControlPlaneService;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.transportaddress.TransportAddress;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import com.google.common.net.InetAddresses;
37
38 public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, BindingAwareProvider {
39     protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
40
41     private static Object startLock = new Object();
42     private LispIoThread lispThread;
43     private LispIoThread xtrThread;
44     private LispSouthboundService lispSouthboundService;
45     private LispXtrSouthboundService lispXtrSouthboundService;
46     private NotificationProviderService notificationService;
47     private RpcProviderRegistry rpcRegistry;
48     private BindingAwareBroker broker;
49     private volatile DatagramSocket socket = null;
50     private volatile String bindingAddress = null;
51     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
52     private volatile boolean listenOnXtrPort = false;
53     private BindingAwareBroker.RpcRegistration<LfmControlPlaneService> controlPlaneRpc;
54     private DatagramSocket xtrSocket;
55
56     public void init() {
57         LOG.info("LISP (RFC6830) Mapping Service is up!");
58         final LfmControlPlaneRpc lfmCpRpc = new LfmControlPlaneRpc(this);
59
60         controlPlaneRpc = rpcRegistry.addRpcImplementation(LfmControlPlaneService.class, lfmCpRpc);
61         broker.registerProvider(this);
62
63         synchronized (startLock) {
64             lispSouthboundService = new LispSouthboundService();
65             lispXtrSouthboundService = new LispXtrSouthboundService();
66             lispSouthboundService.setNotificationProvider(this.notificationService);
67             lispXtrSouthboundService.setNotificationProvider(this.notificationService);
68             LOG.trace("Provider Session initialized");
69             if (bindingAddress == null) {
70                 setLispAddress("0.0.0.0");
71             }
72             LOG.info("LISP (RFC6830) Mapping Service is up!");
73         }
74     }
75
76     public void setNotificationProviderService(NotificationProviderService notificationService) {
77         this.notificationService = notificationService;
78     }
79
80     public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) {
81         this.rpcRegistry = rpcRegistry;
82     }
83
84     public void setBindingAwareBroker(BindingAwareBroker broker) {
85         this.broker = broker;
86     }
87
88     private void unloadActions() {
89         if (lispThread != null) {
90             lispThread.stopRunning();
91         }
92         lispSouthboundService = null;
93         lispXtrSouthboundService = null;
94         lispThread = null;
95         xtrThread = null;
96         bindingAddress = null;
97         LOG.info("LISP (RFC6830) Mapping Service is down!");
98         try {
99             Thread.sleep(1100);
100         } catch (InterruptedException e) {
101         }
102     }
103
104     private class LispIoThread extends Thread {
105         private volatile boolean shouldRun;
106         private volatile DatagramSocket threadSocket = null;
107         private volatile ILispSouthboundService service;
108         private volatile boolean running;
109
110         public LispIoThread(DatagramSocket socket, ILispSouthboundService service) {
111             super("Lisp Thread");
112             this.threadSocket = socket;
113             this.service = service;
114             shouldRun = true;
115         }
116
117         @Override
118         public void run() {
119             running = true;
120
121             int lispReceiveTimeout = 1000;
122
123             LOG.info("LISP (RFC6830) Mapping Service is running and listening on address: " + bindingAddress
124                     + " port: " + threadSocket.getLocalPort());
125             try {
126
127                 threadSocket.setSoTimeout(lispReceiveTimeout);
128             } catch (SocketException e) {
129                 LOG.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e);
130                 return;
131             }
132
133             while (shouldRun) {
134                 byte[] buffer = new byte[4096];
135                 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
136                 try {
137                     threadSocket.receive(packet);
138                     LOG.trace("Received a packet!");
139                 } catch (SocketTimeoutException ste) {
140                     continue;
141                 } catch (IOException e) {
142                     LOG.warn("IO Exception while trying to recieve packet", e);
143                 }
144                 LOG.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress()
145                         .getHostAddress(), packet.getPort(), packet.getLength()));
146
147                 try {
148                     this.service.handlePacket(packet);
149                 } catch (Exception e) {
150                     LOG.warn("Error while handling packet", e);
151                 }
152             }
153
154             threadSocket.close();
155             LOG.trace("Socket closed");
156             running = false;
157         }
158
159         public void stopRunning() {
160             shouldRun = false;
161         }
162
163         public boolean isRunning() {
164             return running;
165         }
166     }
167
168     public static String intToIpv4(int address) {
169         return ((address >> 24) & 0xff) + "." + //
170                 ((address >> 16) & 0xff) + "." + //
171                 ((address >> 8) & 0xff) + "." + //
172                 ((address >> 0) & 0xff);
173     }
174
175     public String getHelp() {
176         StringBuffer help = new StringBuffer();
177         help.append("---LISP Southbound Plugin---\n");
178         return help.toString();
179     }
180
181     private void startIOThread() {
182         if (socket != null) {
183             while (!socket.isClosed()) {
184                 try {
185                     Thread.sleep(500);
186                 } catch (InterruptedException e) {
187                 }
188             }
189         }
190         try {
191             socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM));
192             lispThread = new LispIoThread(socket, lispSouthboundService);
193             lispThread.start();
194             LOG.info("LISP (RFC6830) Mapping Service Southbound Plugin is up!");
195             if (listenOnXtrPort) {
196                 restartXtrThread();
197             }
198         } catch (SocketException e) {
199             LOG.error("couldn't start socket: {}", ExceptionUtils.getStackTrace(e));
200         }
201     }
202
203     private void restartXtrThread() {
204         try {
205             stopXtrThread();
206             xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort));
207             xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService);
208             xtrThread.start();
209             LOG.info("xTR Southbound Plugin is up!");
210         } catch (SocketException e) {
211             LOG.warn("failed to start xtr thread: {}", ExceptionUtils.getStackTrace(e));
212         }
213     }
214
215     public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, String packetType) {
216         DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit());
217         packet.setPort(address.getPort().getValue());
218         InetAddress ip = InetAddresses.forString(new String(address.getIpAddress().getValue()));
219         packet.setAddress(ip);
220         try {
221             if (LOG.isDebugEnabled()) {
222                 LOG.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip);
223             }
224             socket.send(packet);
225         } catch (IOException e) {
226             LOG.warn("Failed to send " + packetType, e);
227         }
228     }
229
230     public void setLispAddress(String address) {
231         synchronized (startLock) {
232             if (bindingAddress != null && bindingAddress.equals(address)) {
233                 LOG.trace("configured lisp binding address didn't change.");
234             } else {
235                 String action = (bindingAddress == null ? "Setting" : "Resetting");
236                 LOG.trace(action + " lisp binding address to: " + address);
237                 bindingAddress = address;
238                 if (lispThread != null) {
239                     lispThread.stopRunning();
240                     while (lispThread.isRunning()) {
241                         try {
242                             Thread.sleep(500);
243                         } catch (InterruptedException e) {
244                         }
245                     }
246                 }
247                 stopXtrThread();
248                 startIOThread();
249             }
250         }
251     }
252
253     private void stopXtrThread() {
254         if (xtrThread != null) {
255             xtrThread.stopRunning();
256             while (xtrThread.isRunning()) {
257                 try {
258                     Thread.sleep(500);
259                 } catch (InterruptedException e) {
260                 }
261             }
262         }
263     }
264
265     @Override
266     public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
267         listenOnXtrPort = shouldListenOnXtrPort;
268         if (listenOnXtrPort) {
269             LOG.debug("restarting xtr thread");
270             restartXtrThread();
271         } else {
272             LOG.debug("terminating thread");
273             stopXtrThread();
274         }
275     }
276
277     @Override
278     public void setXtrPort(int port) {
279         this.xtrPort = port;
280         if (listenOnXtrPort) {
281             restartXtrThread();
282         }
283     }
284
285     @Override
286     public void close() throws Exception {
287         unloadActions();
288         controlPlaneRpc.close();
289     }
290
291     @Override
292     public void onSessionInitiated(ProviderContext session) {
293         LOG.debug("LispSouthboundPlugin Provider Session Initiated");
294     }
295 }