2 * Copyright (c) 2014 Contextream, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.lispflowmapping.southbound;
11 import java.io.IOException;
12 import java.net.DatagramPacket;
13 import java.net.DatagramSocket;
14 import java.net.InetAddress;
15 import java.net.InetSocketAddress;
16 import java.net.SocketException;
17 import java.net.SocketTimeoutException;
18 import java.nio.ByteBuffer;
20 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
21 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
22 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
23 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
24 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
25 import org.opendaylight.lispflowmapping.implementation.serializer.LispMessage;
26 import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService;
27 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
28 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService;
29 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
30 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.LfmControlPlaneService;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.control.plane.rev150314.transportaddress.TransportAddress;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
35 import com.google.common.net.InetAddresses;
37 public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, BindingAwareProvider {
38 protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
40 private static Object startLock = new Object();
41 private LispIoThread lispThread;
42 private LispIoThread xtrThread;
43 private LispSouthboundService lispSouthboundService;
44 private LispXtrSouthboundService lispXtrSouthboundService;
45 private NotificationProviderService notificationService;
46 private RpcProviderRegistry rpcRegistry;
47 private BindingAwareBroker broker;
48 private volatile DatagramSocket socket = null;
49 private volatile String bindingAddress = null;
50 private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
51 private volatile boolean listenOnXtrPort = false;
52 private BindingAwareBroker.RpcRegistration<LfmControlPlaneService> controlPlaneRpc;
53 private DatagramSocket xtrSocket;
56 LOG.info("LISP (RFC6830) Mapping Service is up!");
57 final LfmControlPlaneRpc lfmCpRpc = new LfmControlPlaneRpc(this);
59 controlPlaneRpc = rpcRegistry.addRpcImplementation(LfmControlPlaneService.class, lfmCpRpc);
60 broker.registerProvider(this);
62 synchronized (startLock) {
63 lispSouthboundService = new LispSouthboundService();
64 lispXtrSouthboundService = new LispXtrSouthboundService();
65 lispSouthboundService.setNotificationProvider(this.notificationService);
66 lispXtrSouthboundService.setNotificationProvider(this.notificationService);
67 LOG.trace("Provider Session initialized");
68 if (bindingAddress == null) {
69 setLispAddress("0.0.0.0");
71 LOG.info("LISP (RFC6830) Mapping Service is up!");
75 public void setNotificationProviderService(NotificationProviderService notificationService) {
76 this.notificationService = notificationService;
79 public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) {
80 this.rpcRegistry = rpcRegistry;
83 public void setBindingAwareBroker(BindingAwareBroker broker) {
87 private void unloadActions() {
88 if (lispThread != null) {
89 lispThread.stopRunning();
91 lispSouthboundService = null;
92 lispXtrSouthboundService = null;
95 bindingAddress = null;
96 LOG.info("LISP (RFC6830) Mapping Service is down!");
99 } catch (InterruptedException e) {
103 private class LispIoThread extends Thread {
104 private volatile boolean shouldRun;
105 private volatile DatagramSocket threadSocket = null;
106 private volatile ILispSouthboundService service;
107 private volatile boolean running;
109 public LispIoThread(DatagramSocket socket, ILispSouthboundService service) {
110 super("Lisp Thread");
111 this.threadSocket = socket;
112 this.service = service;
120 int lispReceiveTimeout = 1000;
122 LOG.info("LISP (RFC6830) Mapping Service is running and listening on address: " + bindingAddress
123 + " port: " + threadSocket.getLocalPort());
126 threadSocket.setSoTimeout(lispReceiveTimeout);
127 } catch (SocketException e) {
128 LOG.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e);
133 byte[] buffer = new byte[4096];
134 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
136 threadSocket.receive(packet);
137 LOG.trace("Received a packet!");
138 } catch (SocketTimeoutException ste) {
140 } catch (IOException e) {
141 LOG.warn("IO Exception while trying to recieve packet", e);
143 LOG.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress()
144 .getHostAddress(), packet.getPort(), packet.getLength()));
147 this.service.handlePacket(packet);
148 } catch (Exception e) {
149 LOG.warn("Error while handling packet", e);
153 threadSocket.close();
154 LOG.trace("Socket closed");
158 public void stopRunning() {
162 public boolean isRunning() {
167 public static String intToIpv4(int address) {
168 return ((address >> 24) & 0xff) + "." + //
169 ((address >> 16) & 0xff) + "." + //
170 ((address >> 8) & 0xff) + "." + //
171 ((address >> 0) & 0xff);
174 public String getHelp() {
175 StringBuffer help = new StringBuffer();
176 help.append("---LISP Southbound Plugin---\n");
177 return help.toString();
180 private void startIOThread() {
181 if (socket != null) {
182 while (!socket.isClosed()) {
185 } catch (InterruptedException e) {
190 socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM));
191 lispThread = new LispIoThread(socket, lispSouthboundService);
193 LOG.info("LISP (RFC6830) Mapping Service Southbound Plugin is up!");
194 if (listenOnXtrPort) {
197 } catch (SocketException e) {
198 LOG.error("couldn't start socket {}", e.getMessage());
202 private void restartXtrThread() {
205 xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort));
206 xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService);
208 LOG.info("xTR Southbound Plugin is up!");
209 } catch (SocketException e) {
210 LOG.warn("failed to start xtr thread: {}", e.getMessage());
214 public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, String packetType) {
215 DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit());
216 packet.setPort(address.getPort().getValue());
217 InetAddress ip = InetAddresses.forString(new String(address.getIpAddress().getValue()));
218 packet.setAddress(ip);
220 if (LOG.isDebugEnabled()) {
221 LOG.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip);
224 } catch (IOException e) {
225 LOG.warn("Failed to send " + packetType, e);
229 public void setLispAddress(String address) {
230 synchronized (startLock) {
231 if (bindingAddress != null && bindingAddress.equals(address)) {
232 LOG.trace("configured lisp binding address didn't change.");
234 String action = (bindingAddress == null ? "Setting" : "Resetting");
235 LOG.trace(action + " lisp binding address to: " + address);
236 bindingAddress = address;
237 if (lispThread != null) {
238 lispThread.stopRunning();
239 while (lispThread.isRunning()) {
242 } catch (InterruptedException e) {
252 private void stopXtrThread() {
253 if (xtrThread != null) {
254 xtrThread.stopRunning();
255 while (xtrThread.isRunning()) {
258 } catch (InterruptedException e) {
265 public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
266 listenOnXtrPort = shouldListenOnXtrPort;
267 if (listenOnXtrPort) {
268 LOG.debug("restarting xtr thread");
271 LOG.debug("terminating thread");
277 public void setXtrPort(int port) {
279 if (listenOnXtrPort) {
285 public void close() throws Exception {
287 controlPlaneRpc.close();
291 public void onSessionInitiated(ProviderContext session) {
292 LOG.debug("LispSouthboundPlugin Provider Session Initiated");