1 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
3 import java.io.IOException;
4 import java.net.InetAddress;
5 import java.net.InetSocketAddress;
6 import java.net.SocketAddress;
7 import java.util.ArrayList;
8 import java.util.Iterator;
11 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.ThreadPoolExecutor;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicInteger;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.nio.channels.ClosedChannelException;
22 import java.nio.channels.SocketChannel;
24 import org.jboss.netty.bootstrap.ServerBootstrap;
25 import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
26 import org.jboss.netty.channel.Channel;
27 import org.jboss.netty.channel.ChannelHandlerContext;
28 import org.jboss.netty.channel.ChannelPipelineFactory;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.Channels;
31 import org.jboss.netty.channel.ExceptionEvent;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.channel.group.ChannelGroup;
34 import org.jboss.netty.channel.group.DefaultChannelGroup;
35 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
36 import org.jboss.netty.handler.execution.ExecutionHandler;
37 import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
38 import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
39 import org.jboss.netty.handler.timeout.IdleStateEvent;
40 import org.jboss.netty.util.HashedWheelTimer;
41 import org.jboss.netty.util.ObjectSizeEstimator;
42 import org.jboss.netty.handler.timeout.ReadTimeoutException;
47 import org.openflow.protocol.OFMessage;
48 import org.openflow.protocol.OFType;
49 import org.openflow.protocol.factory.BasicFactory;
50 import org.openflow.protocol.factory.MessageParseException;
51 import org.opendaylight.controller.protocol_plugin.openflow.core.IEnhancedSwitch;
52 import org.opendaylight.controller.sal.connection.ConnectionConstants;
53 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
54 import org.opendaylight.controller.sal.core.Node;
55 import org.opendaylight.controller.sal.utils.Status;
56 import org.opendaylight.controller.sal.utils.StatusCode;
57 import org.opendaylight.openflowplugin.openflow.core.IController;
58 import org.opendaylight.openflowplugin.openflow.core.IMessageListener;
59 import org.opendaylight.openflowplugin.openflow.core.ISwitch;
60 import org.opendaylight.openflowplugin.openflow.core.ISwitchStateListener;
61 import org.opendaylight.openflowplugin.openflow.core.internal.SwitchEvent;
62 //import org.opendaylight.controller.protocol_plugin.openflow.core.internal.OFChannelState.HandshakeState;
63 //import org.openflow.protocol.OFType;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
70 public class EnhancedController implements IController, IPluginInConnectionService {
73 protected BasicFactory factory;
76 private static final Logger logger = LoggerFactory
77 .getLogger(EnhancedController.class);
80 // Track connected switches via SwitchID
81 private ConcurrentHashMap<Long, ISwitch> connectedSwitches;
83 // Track connected switches via ChannelID. Whenever the message
84 private ConcurrentHashMap<Integer, IEnhancedSwitch> channelIDToSwitchMap;
86 // only 1 message listener per OFType
87 private ConcurrentMap<OFType, IMessageListener> messageListeners;
89 // only 1 switch state listener
90 private ISwitchStateListener switchStateListener;
91 private AtomicInteger switchInstanceNumber;
94 private OFChannelHandler ofChannelHandler = null;
95 private ControllerServerBootstrap bootstrap = null;
97 private ThreadPoolExecutor execHandler = null;
99 private static final int SEND_BUFFER_SIZE = 1 * 1024 * 1024;
100 private static final int RECEIVE_BUFFER_SIZE = 1 * 1024 * 1024;
101 private static final int WRITE_BUFFER_LOW_WATERMARK = 32 * 1024;
102 private static final int WRITE_BUFFER_HIGH_WATERMARK = 64 * 1024;
103 private static final String CONTROLLER_HOST = null;
104 private static final int CONTROLLER_PORT = 6633;
106 private static final int OMATPE_CORE_POOL_SIZE = 200;
107 private static final int OMATPE_PER_CHANNEL_SIZE = 2 * 1048576;
108 private static final int OMATPE_POOL_WIDE_SIZE = 0; //1073741824;
109 private static final int OMATPE_THREAD_KEEP_ALIVE_IN_MILLISECONDS = 100;
110 private static final int EXPERIMENTAL_OMATPE_OBJECT_SIZE = 1000; // bytes
112 private HashedWheelTimer hashedWheelTimer = null;
114 // This executor would be used by individual switches to handle
115 // cases like Stats Request/Response or Sync* methods which sends request and
116 // waits via Future for responses. Please note that threads in this
117 // pool are shared across multiple threads. So, if all threads are busy,
118 // Socket IO thread would get blocked creating sharp decline in performance
119 // If possible TOTALLY avoid any thread usage which does network level
120 // request / response by making a thread in this pool wait for response
121 // Consider storing the Future reference against the "sent" request and
122 // fire-event to wake-up the same when response is received rather than making the
123 // sender thread getting into a "wait" mode. That would never scale
124 private ExecutorService executorService = null;
126 // IMPORTANT: DO NOT REDUCE THIS THREAD COUNT TO 0
127 // THIS THREAD COUNT WOULD BE USED FOR SOCKET-IO + FOLLOWING EXECUTION CHAIN
128 // Plugin + SAL + North-to-SAL + Egress (flow_provisioning)
129 private static final int WORKER_THREAD_COUNT = 4;
131 // This is a handy thread-pool if WORKER_THREAD_COUNT is not able to cope with
132 // Socket IO + Execution of the following handling chain
133 // Plugin + SAL + North-to-SAL + Egress (flow_provisioning)
134 private static final int EXECUTION_HANDLER_THREAD_POOL_SIZE = 0;
136 // This is the thread-pool which can be optionally used for
137 // building synchronous semantics for flow_mod and stats handling cycle
138 // Flow_Mod in synchronous model could involve FLOW_MOD + BARRIER_MSG
139 // sending and receiving with wait timeout for reply
140 // Stats handling in synchronous model could involve STATS_REQUEST + STATS_REPLY
141 // sending and receiving with wait timeout for reply
142 private static final int THREAD_POOL_SIZE_FOR_EGRESS_SYNC_MSGS = 30;
144 private TrafficStatisticsHandler statsHandler = null;
146 // Lock for locking messagelisteners list while escalating the switch
148 private ReentrantLock lock = new ReentrantLock();
150 private static final int FLUSH_BATCH_SIZE = 100;
152 //****************** IController Interafce Methods Begin ******************
155 public void addMessageListener(OFType type, IMessageListener listener) {
156 IMessageListener currentListener = this.messageListeners.get(type);
157 if (currentListener != null) {
158 logger.warn("{} is already listened by {}", type.toString(),
159 currentListener.toString());
161 this.messageListeners.put(type, listener);
162 logger.debug("{} is now listened by {}", type.toString(),
163 listener.toString());
168 public void removeMessageListener(OFType type, IMessageListener listener) {
169 IMessageListener currentListener = this.messageListeners.get(type);
170 if ((currentListener != null) && (currentListener == listener)) {
171 logger.debug("{} listener {} is Removed", type.toString(),
172 listener.toString());
173 this.messageListeners.remove(type);
179 public void addSwitchStateListener(ISwitchStateListener listener) {
180 if (this.switchStateListener != null) {
181 logger.warn("Switch events are already listened by {}",
182 this.switchStateListener.toString());
184 this.switchStateListener = listener;
185 logger.debug("Switch events are now listened by {}",
186 listener.toString());
191 public void removeSwitchStateListener(ISwitchStateListener listener) {
192 if ((this.switchStateListener != null)
193 && (this.switchStateListener == listener)) {
194 logger.debug("SwitchStateListener {} is Removed",
195 listener.toString());
196 this.switchStateListener = null;
202 public Map<Long, ISwitch> getSwitches() {
203 return this.connectedSwitches;
207 public ISwitch getSwitch(Long switchId) {
208 return this.connectedSwitches.get(switchId);
211 //****************** IController Interafce Methods End ******************
215 //****************** Dependency-manager callbacks Begin ******************
217 * Function called by the dependency manager when all the required
218 * dependencies are satisfied
222 logger.debug("Initializing!");
223 this.connectedSwitches = new ConcurrentHashMap<Long, ISwitch>();
224 this.channelIDToSwitchMap = new ConcurrentHashMap<Integer, IEnhancedSwitch>();
225 this.messageListeners = new ConcurrentHashMap<OFType, IMessageListener>();
226 this.switchStateListener = null;
227 this.hashedWheelTimer = new HashedWheelTimer();
228 this.statsHandler = new TrafficStatisticsHandler(hashedWheelTimer);
229 this.switchInstanceNumber = new AtomicInteger(0);
230 this.factory = new BasicFactory();
231 this.bootstrap = new ControllerServerBootstrap(this);
232 this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE_FOR_EGRESS_SYNC_MSGS);
238 * Function called by dependency manager after "init ()" is called and after
239 * the services provided by the class are registered in the service registry
242 public void start() {
243 this.statsHandler.init();
244 logger.debug("Starting!");
245 bootstrap.startServer(WORKER_THREAD_COUNT,
254 * Function called by the dependency manager before the services exported by
255 * the component are unregistered, this will be followed by a "destroy ()"
260 for (Iterator<Entry<Integer, IEnhancedSwitch>> it = channelIDToSwitchMap.entrySet().iterator(); it
262 Entry<Integer, IEnhancedSwitch> entry = it.next();
263 ((EnhancedSwitchHandler) entry.getValue()).stop();
266 hashedWheelTimer.stop();
268 executorService.shutdown();
272 * Function called by the dependency manager when at least one dependency
273 * become unsatisfied or when the component is shutting down because for
274 * example bundle is being stopped.
277 public void destroy() {
279 //****************** Dependency-manager callbacks End ******************
283 public OFChannelHandler getChannelHandler(){
284 return new OFChannelHandler(this);
288 protected class OFChannelHandler extends IdleStateAwareChannelUpstreamHandler{
291 protected EnhancedController controller = null;
292 protected Channel channel = null;
295 public OFChannelHandler(EnhancedController controller){
296 this.controller = controller;
301 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
303 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
304 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
305 e.getChannel().write(msglist);
306 statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REQUEST_SENT);
310 public void channelConnected(ChannelHandlerContext ctx,
311 ChannelStateEvent e) throws Exception {
312 channel = e.getChannel();
313 logger.info("New switch connection from {}",
314 channel.getRemoteAddress());
316 Integer channelID = e.getChannel().getId();
318 IEnhancedSwitch switchHandler = new EnhancedSwitchHandler(controller,
319 channelID, channel, hashedWheelTimer, executorService, statsHandler);
320 switchHandler.startHandler();
321 channelIDToSwitchMap.put(channelID, switchHandler);
322 statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONNECTED_SWITCHES);
327 public void channelDisconnected(ChannelHandlerContext ctx,
328 ChannelStateEvent e) throws Exception {
329 // when SwitchHandler.shutDownHandler is called, Controller would
330 // get the feedback via switchDeleted method. So that both SwitchHandler and
331 // controller both release resources of the switch concerned
333 Integer channelID = e.getChannel().getId();
334 IEnhancedSwitch switchHandler = channelIDToSwitchMap.get(channelID);
335 if (switchHandler != null){
336 switchHandler.shutDownHandler();
338 statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.DISCONNECTED_SWITCHES);
343 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
346 EnhancedSwitchHandler sw = null;
348 if (e.getCause() instanceof ReadTimeoutException) {
350 logger.error("Disconnecting switch {} due to read timeout",
351 e.getChannel().getId(), e.getCause().getMessage());
352 ctx.getChannel().close();
353 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
356 } else if (e.getCause() instanceof HandshakeTimeoutException) {
357 logger.error("Disconnecting switch {}: failed to complete handshake",
358 e.getChannel().getId());
359 ctx.getChannel().close();
360 channelIDToSwitchMap.remove(e.getChannel().getId());
362 } else if (e.getCause() instanceof ClosedChannelException) {
363 logger.warn("Channel for sw {} already closed Error : {}",
364 e.getChannel().getId(), e.getCause().getMessage());
365 ctx.getChannel().close();
366 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
368 } else if (e.getCause() instanceof IOException) {
369 logger.error("Disconnecting switch {} due to IO Error: {}",
370 e.getChannel().getId(), e.getCause().getMessage());
371 ctx.getChannel().close();
372 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
375 } else if (e.getCause() instanceof SwitchStateException) {
376 logger.error("Disconnecting switch {} due to switch state error: {}",
377 e.getChannel().getId(), e.getCause().getMessage());
378 ctx.getChannel().close();
379 channelIDToSwitchMap.remove(e.getChannel().getId());
381 } else if (e.getCause() instanceof MessageParseException) {
382 logger.error("Disconnecting switch {} due to message parse error Error : {}",
383 e.getChannel().getId(), e.getCause().getMessage());
384 ctx.getChannel().close();
385 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
387 } else if (e.getCause() instanceof RejectedExecutionException) {
388 logger.warn("Could not process message: queue full");
389 ctx.getChannel().close();
390 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
393 logger.error("Error while processing message from switch {} Error : {}",
394 e.getChannel().getId(), e.getCause().getMessage());
395 e.getCause().printStackTrace();
396 ctx.getChannel().close();
397 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
401 statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.EXCEPTION_CAUGHT);
405 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
407 Integer messageChannelId = e.getChannel().getId();
408 IEnhancedSwitch swHan = (EnhancedSwitchHandler)channelIDToSwitchMap.get(messageChannelId);
410 if (e.getMessage() instanceof List) {
411 //@SuppressWarnings("unchecked")
412 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
413 if (msglist != null){ // this check actually brought down rate to some extent - weird !!!
414 for (OFMessage ofm : msglist) {
417 // Do the actual packet processing
418 processOFMessage(ofm, messageChannelId);
420 catch (Exception ex) {
421 // We are the last handler in the stream, so run the
422 // exception through the channel again by passing in
424 Channels.fireExceptionCaught(ctx.getChannel(), ex);
430 // Flush all flow-mods/packet-out/stats generated from this "train"
431 swHan.flushBufferedMessages();
433 statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.MESSAGE_RECEIVED);
439 public void processOFMessage(OFMessage ofm, Integer channelID){
440 IEnhancedSwitch switchHandler = (IEnhancedSwitch) channelIDToSwitchMap.get(channelID);
441 statsHandler.countForEntitySimpleMeasurement(channelID, TrafficStatisticsHandler.ENTITY_COUNTER_RCV_MSG);
442 if (switchHandler != null){
443 switchHandler.handleMessage(ofm);
451 protected class ControllerServerBootstrap{
453 private int workerThreads = 0;
454 private EnhancedController controller = null;
456 public ControllerServerBootstrap(EnhancedController controller){
457 this.controller = controller;
461 public void startServer(int numWorkerThreads, String openFlowHost, int openFlowPort, OFChannelHandler ofchan){
462 this.workerThreads = numWorkerThreads;
464 final ServerBootstrap bootstrap = createServerBootStrap();
466 bootstrap.setOption("reuseAddr", true);
467 bootstrap.setOption("child.keepAlive", true);
468 bootstrap.setOption("child.tcpNoDelay", true);
469 bootstrap.setOption("child.receiveBufferSize", EnhancedController.RECEIVE_BUFFER_SIZE);
470 bootstrap.setOption("child.sendBufferSize", EnhancedController.SEND_BUFFER_SIZE);
472 // better to have an receive buffer predictor
473 //bootstrap.setOption("receiveBufferSizePredictorFactory",
474 // new AdaptiveReceiveBufferSizePredictorFactory());
475 //if the server is sending 1000 messages per sec, optimum write buffer water marks will
476 //prevent unnecessary throttling, Check NioSocketChannelConfig doc
477 //bootstrap.setOption("writeBufferLowWaterMark", WRITE_BUFFER_LOW_WATERMARK);
478 //bootstrap.setOption("writeBufferHighWaterMark", WRITE_BUFFER_HIGH_WATERMARK);
480 // TODO: IMPORTANT: If the threadpool is supplied as null, ExecutionHandler would
481 // not be present in pipeline. If the load increases and ordering is required ,
482 // use OrderedMemoryAwareThreadPoolExecutor as argument instead of null
485 execHandler = new OrderedMemoryAwareThreadPoolExecutor(
486 OMATPE_CORE_POOL_SIZE,
487 OMATPE_PER_CHANNEL_SIZE,
488 OMATPE_POOL_WIDE_SIZE,
489 OMATPE_THREAD_KEEP_ALIVE_IN_MILLISECONDS,
490 TimeUnit.MILLISECONDS,
491 new ObjectSizeEstimator() {
494 public int estimateSize(Object o) {
498 Executors.defaultThreadFactory()); */
500 execHandler = new OrderedMemoryAwareThreadPoolExecutor(
501 OMATPE_CORE_POOL_SIZE,
502 OMATPE_PER_CHANNEL_SIZE,
503 OMATPE_POOL_WIDE_SIZE,
504 OMATPE_THREAD_KEEP_ALIVE_IN_MILLISECONDS,
505 TimeUnit.MILLISECONDS);
509 ChannelPipelineFactory pfact =
510 new OpenflowPipelineFactory(controller, execHandler);
511 bootstrap.setPipelineFactory(pfact);
512 InetSocketAddress sa =
513 (openFlowHost == null)
514 ? new InetSocketAddress(openFlowPort)
515 : new InetSocketAddress(openFlowHost, openFlowPort);
516 final ChannelGroup cg = new DefaultChannelGroup();
517 cg.add(bootstrap.bind(sa));
520 } catch (Exception e) {
521 throw new RuntimeException(e);
526 private ServerBootstrap createServerBootStrap() {
527 if (workerThreads == 0) {
528 return new ServerBootstrap(
529 new NioServerSocketChannelFactory(
530 Executors.newCachedThreadPool(),
531 Executors.newCachedThreadPool()));
533 return new ServerBootstrap(
534 new NioServerSocketChannelFactory(
535 Executors.newCachedThreadPool(),
536 Executors.newCachedThreadPool(), workerThreads));
546 * Method called by SwitchHandler once the handshake state is completed
550 public void switchAdded(SwitchEvent switchEv, Integer switchChannelID){
552 ISwitch sw = switchEv.getSwitch();
553 Long switchId = sw.getId();
555 connectedSwitches.put(switchId, sw);
556 statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONNECTED_SWITCHES);
558 logger.info("Switch with DPID : {} connected ", switchId);
560 notifySwitchAdded(sw);
565 * Method called by SwitchHandler switch is disconnected
570 public void switchDeleted(SwitchEvent switchEv, Integer switchChannelID){
571 ISwitch sw = switchEv.getSwitch();
572 disconnectSwitch(sw, switchChannelID);
577 * Method called by SwitchHandler when it encounters any errors
583 public void switchError(SwitchEvent switchEv, Integer switchChannelID){
588 public void switchMessage(SwitchEvent switchEv, Integer switchChannelID){
593 OFMessage msg = switchEv.getMsg();
594 ISwitch sw = switchEv.getSwitch();
598 IMessageListener listener = messageListeners
600 if (listener != null) {
601 //logger.debug("delegating to msg-receiver");
602 //startTime = System.nanoTime();
603 listener.receive(sw, msg);
604 //endTime = System.nanoTime();
605 //this.statsHandler.reportPacketInProcessingTime(endTime - startTime);
614 public void disconnectSwitch(ISwitch sw, Integer switchChannelID){
616 if (((EnhancedSwitchHandler) sw).isOperational()) {
619 this.connectedSwitches.remove(sid);
620 this.channelIDToSwitchMap.remove(switchChannelID);
621 notifySwitchDeleted(sw);
623 //((EnhancedSwitchHandler) sw).stop();
624 logger.info("Switch with DPID {} disconnected", sid);
629 private void notifySwitchAdded(ISwitch sw) {
630 if (switchStateListener != null) {
631 switchStateListener.switchAdded(sw);
635 private void notifySwitchDeleted(ISwitch sw) {
636 if (switchStateListener != null) {
637 switchStateListener.switchDeleted(sw);
642 public Status disconnect(Node node) {
643 ISwitch sw = getSwitch((Long)node.getID());
645 if (sw instanceof EnhancedSwitchHandler) {
646 EnhancedSwitchHandler eSw = (EnhancedSwitchHandler)sw;
647 disconnectSwitch(sw, eSw.getSwitchChannelID());
650 return new Status(StatusCode.SUCCESS);
654 public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
659 * View Change notification
661 public void notifyClusterViewChanged() {
662 for (ISwitch sw : connectedSwitches.values()) {
663 notifySwitchAdded(sw);
668 * Node Disconnected from the node's master controller.
671 public void notifyNodeDisconnectFromMaster(Node node) {
672 ISwitch sw = connectedSwitches.get((Long)node.getID());
673 if (sw != null) notifySwitchAdded(sw);