Migrate implementation/neutron/southbound to IETF YANG model
[lispflowmapping.git] / mappingservice / southbound / src / main / java / org / opendaylight / lispflowmapping / southbound / LispSouthboundPlugin.java
1 /*
2  * Copyright (c) 2014 Contextream, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.lispflowmapping.southbound;
10
11 import java.io.IOException;
12 import java.net.DatagramPacket;
13 import java.net.DatagramSocket;
14 import java.net.InetAddress;
15 import java.net.InetSocketAddress;
16 import java.net.SocketException;
17 import java.net.SocketTimeoutException;
18 import java.nio.ByteBuffer;
19
20 import org.apache.commons.lang3.exception.ExceptionUtils;
21 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
22 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
23 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
24 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
25 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
26 import org.opendaylight.lispflowmapping.lisp.type.LispMessage;
27 import org.opendaylight.lispflowmapping.southbound.lisp.ILispSouthboundService;
28 import org.opendaylight.lispflowmapping.southbound.lisp.LispSouthboundService;
29 import org.opendaylight.lispflowmapping.southbound.lisp.LispXtrSouthboundService;
30 import org.opendaylight.lispflowmapping.type.sbplugin.IConfigLispSouthboundPlugin;
31 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.MessageType;
32 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.proto.rev151105.transport.address.TransportAddress;
33 import org.opendaylight.yang.gen.v1.urn.opendaylight.lfm.lisp.sb.rev150904.OdlLispSbService;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.common.net.InetAddresses;
38
39 public class LispSouthboundPlugin implements IConfigLispSouthboundPlugin, AutoCloseable, BindingAwareProvider {
40     protected static final Logger LOG = LoggerFactory.getLogger(LispSouthboundPlugin.class);
41
42     private static Object startLock = new Object();
43     private LispIoThread lispThread;
44     private LispIoThread xtrThread;
45     private LispSouthboundService lispSouthboundService;
46     private LispXtrSouthboundService lispXtrSouthboundService;
47     private NotificationPublishService notificationPublishService;
48     private RpcProviderRegistry rpcRegistry;
49     private BindingAwareBroker broker;
50     private volatile DatagramSocket socket = null;
51     private volatile String bindingAddress = null;
52     private volatile int xtrPort = LispMessage.XTR_PORT_NUM;
53     private volatile boolean listenOnXtrPort = false;
54     private BindingAwareBroker.RpcRegistration<OdlLispSbService> sbRpcRegistration;
55     private DatagramSocket xtrSocket;
56     private LispSouthboundStats statistics = new LispSouthboundStats();
57
58     public void init() {
59         LOG.info("LISP (RFC6830) southbound plugin is initializing...");
60         final LispSouthboundRPC sbRpcHandler = new LispSouthboundRPC(this);
61
62         sbRpcRegistration = rpcRegistry.addRpcImplementation(OdlLispSbService.class, sbRpcHandler);
63         broker.registerProvider(this);
64
65         synchronized (startLock) {
66             lispSouthboundService = new LispSouthboundService(this);
67             lispXtrSouthboundService = new LispXtrSouthboundService();
68             lispSouthboundService.setNotificationProvider(this.notificationPublishService);
69             lispXtrSouthboundService.setNotificationProvider(this.notificationPublishService);
70             if (bindingAddress == null) {
71                 setLispAddress("0.0.0.0");
72             }
73             LOG.info("LISP (RFC6830) southbound plugin is up!");
74         }
75     }
76
77     public void setNotificationPublishService(NotificationPublishService notificationService) {
78         this.notificationPublishService = notificationService;
79     }
80
81     public void setRpcRegistryDependency(RpcProviderRegistry rpcRegistry) {
82         this.rpcRegistry = rpcRegistry;
83     }
84
85     public void setBindingAwareBroker(BindingAwareBroker broker) {
86         this.broker = broker;
87     }
88
89     private void unloadActions() {
90         if (lispThread != null) {
91             lispThread.stopRunning();
92         }
93         lispSouthboundService = null;
94         lispXtrSouthboundService = null;
95         lispThread = null;
96         xtrThread = null;
97         bindingAddress = null;
98         LOG.info("LISP (RFC6830) southbound plugin is down!");
99         try {
100             Thread.sleep(1100);
101         } catch (InterruptedException e) {
102         }
103     }
104
105     private class LispIoThread extends Thread {
106         private volatile boolean shouldRun;
107         private volatile DatagramSocket threadSocket = null;
108         private volatile ILispSouthboundService service;
109         private volatile boolean running;
110
111         public LispIoThread(DatagramSocket socket, ILispSouthboundService service) {
112             super("Lisp Thread");
113             this.threadSocket = socket;
114             this.service = service;
115             shouldRun = true;
116         }
117
118         @Override
119         public void run() {
120             running = true;
121
122             int lispReceiveTimeout = 1000;
123
124             LOG.info("LISP (RFC6830) southbound plugin is running and listening on address: " + bindingAddress
125                     + " port: " + threadSocket.getLocalPort());
126             try {
127
128                 threadSocket.setSoTimeout(lispReceiveTimeout);
129             } catch (SocketException e) {
130                 LOG.error("Cannot open socket on UDP port " + threadSocket.getLocalPort(), e);
131                 return;
132             }
133
134             while (shouldRun) {
135                 byte[] buffer = new byte[4096];
136                 DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
137                 try {
138                     threadSocket.receive(packet);
139                     LOG.trace("Received a packet!");
140                 } catch (SocketTimeoutException ste) {
141                     continue;
142                 } catch (IOException e) {
143                     LOG.warn("IO Exception while trying to recieve packet", e);
144                 }
145                 LOG.trace(String.format("Handling packet from {%s}:{%d} (len={%d})", packet.getAddress()
146                         .getHostAddress(), packet.getPort(), packet.getLength()));
147
148                 try {
149                     this.service.handlePacket(packet);
150                 } catch (Exception e) {
151                     LOG.warn("Error while handling packet", e);
152                 }
153             }
154
155             threadSocket.close();
156             LOG.trace("Socket closed");
157             running = false;
158         }
159
160         public void stopRunning() {
161             shouldRun = false;
162         }
163
164         public boolean isRunning() {
165             return running;
166         }
167     }
168
169     public static String intToIpv4(int address) {
170         return ((address >> 24) & 0xff) + "." + //
171                 ((address >> 16) & 0xff) + "." + //
172                 ((address >> 8) & 0xff) + "." + //
173                 ((address >> 0) & 0xff);
174     }
175
176     private void startIOThread() {
177         if (socket != null) {
178             while (!socket.isClosed()) {
179                 try {
180                     Thread.sleep(500);
181                 } catch (InterruptedException e) {
182                 }
183             }
184         }
185         try {
186             socket = new DatagramSocket(new InetSocketAddress(bindingAddress, LispMessage.PORT_NUM));
187             lispThread = new LispIoThread(socket, lispSouthboundService);
188             lispThread.start();
189             LOG.info("LISP (RFC6830) southbound plugin is listening for control packets!");
190             if (listenOnXtrPort) {
191                 restartXtrThread();
192             }
193         } catch (SocketException e) {
194             LOG.error("couldn't start socket: {}", ExceptionUtils.getStackTrace(e));
195         }
196     }
197
198     private void restartXtrThread() {
199         try {
200             stopXtrThread();
201             xtrSocket = new DatagramSocket(new InetSocketAddress(bindingAddress, xtrPort));
202             xtrThread = new LispIoThread(xtrSocket, lispXtrSouthboundService);
203             xtrThread.start();
204             LOG.info("xTR southbound plugin is up!");
205         } catch (SocketException e) {
206             LOG.warn("failed to start xtr thread: {}", ExceptionUtils.getStackTrace(e));
207         }
208     }
209
210     public void handleSerializedLispBuffer(TransportAddress address, ByteBuffer outBuffer, MessageType packetType) {
211         DatagramPacket packet = new DatagramPacket(outBuffer.array(), outBuffer.limit());
212         packet.setPort(address.getPort().getValue());
213         InetAddress ip = InetAddresses.forString(new String(address.getIpAddress().getValue()));
214         packet.setAddress(ip);
215         try {
216             if (LOG.isDebugEnabled()) {
217                 LOG.trace("Sending " + packetType + " on port " + address.getPort().getValue() + " to address: " + ip);
218             }
219             socket.send(packet);
220             this.statistics.incrementTx(packetType.getIntValue());
221         } catch (IOException e) {
222             LOG.warn("Failed to send " + packetType, e);
223             this.statistics.incrementTxErrors();
224         }
225     }
226
227     public LispSouthboundStats getStats() {
228         return statistics;
229     }
230
231     public void setLispAddress(String address) {
232         synchronized (startLock) {
233             if (bindingAddress != null && bindingAddress.equals(address)) {
234                 LOG.trace("configured lisp binding address didn't change.");
235             } else {
236                 String action = (bindingAddress == null ? "Setting" : "Resetting");
237                 LOG.trace(action + " lisp binding address to: " + address);
238                 bindingAddress = address;
239                 if (lispThread != null) {
240                     lispThread.stopRunning();
241                     while (lispThread.isRunning()) {
242                         try {
243                             Thread.sleep(500);
244                         } catch (InterruptedException e) {
245                         }
246                     }
247                 }
248                 stopXtrThread();
249                 startIOThread();
250             }
251         }
252     }
253
254     private void stopXtrThread() {
255         if (xtrThread != null) {
256             xtrThread.stopRunning();
257             while (xtrThread.isRunning()) {
258                 try {
259                     Thread.sleep(500);
260                 } catch (InterruptedException e) {
261                 }
262             }
263         }
264     }
265
266     @Override
267     public void shouldListenOnXtrPort(boolean shouldListenOnXtrPort) {
268         listenOnXtrPort = shouldListenOnXtrPort;
269         if (listenOnXtrPort) {
270             LOG.debug("restarting xtr thread");
271             restartXtrThread();
272         } else {
273             LOG.debug("terminating thread");
274             stopXtrThread();
275         }
276     }
277
278     @Override
279     public void setXtrPort(int port) {
280         this.xtrPort = port;
281         if (listenOnXtrPort) {
282             restartXtrThread();
283         }
284     }
285
286     @Override
287     public void close() throws Exception {
288         unloadActions();
289         sbRpcRegistration.close();
290     }
291
292     @Override
293     public void onSessionInitiated(ProviderContext session) {
294         LOG.debug("LispSouthboundPlugin Provider Session Initiated");
295     }
296 }