1 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
3 import java.io.IOException;
4 import java.net.InetAddress;
5 import java.net.InetSocketAddress;
6 import java.net.SocketAddress;
7 import java.util.ArrayList;
8 import java.util.Iterator;
11 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.ThreadPoolExecutor;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicInteger;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.nio.channels.ClosedChannelException;
22 import java.nio.channels.SocketChannel;
24 import org.jboss.netty.bootstrap.ServerBootstrap;
25 import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
26 import org.jboss.netty.channel.Channel;
27 import org.jboss.netty.channel.ChannelHandlerContext;
28 import org.jboss.netty.channel.ChannelPipelineFactory;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.Channels;
31 import org.jboss.netty.channel.ExceptionEvent;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.channel.group.ChannelGroup;
34 import org.jboss.netty.channel.group.DefaultChannelGroup;
35 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
36 import org.jboss.netty.handler.execution.ExecutionHandler;
37 import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
38 import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
39 import org.jboss.netty.handler.timeout.IdleStateEvent;
40 import org.jboss.netty.util.HashedWheelTimer;
41 import org.jboss.netty.util.ObjectSizeEstimator;
42 import org.jboss.netty.handler.timeout.ReadTimeoutException;
47 import org.openflow.protocol.OFMessage;
48 import org.openflow.protocol.OFType;
49 import org.openflow.protocol.factory.BasicFactory;
50 import org.openflow.protocol.factory.MessageParseException;
51 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
52 import org.opendaylight.controller.protocol_plugin.openflow.core.IEnhancedSwitch;
53 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
54 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
55 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener;
56 import org.opendaylight.controller.sal.connection.ConnectionConstants;
57 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
58 import org.opendaylight.controller.sal.core.Node;
59 import org.opendaylight.controller.sal.utils.Status;
60 import org.opendaylight.controller.sal.utils.StatusCode;
61 //import org.opendaylight.controller.protocol_plugin.openflow.core.internal.OFChannelState.HandshakeState;
62 //import org.openflow.protocol.OFType;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
69 public class EnhancedController implements IController, IPluginInConnectionService {
72 protected BasicFactory factory;
75 private static final Logger logger = LoggerFactory
76 .getLogger(EnhancedController.class);
79 // Track connected switches via SwitchID
80 private ConcurrentHashMap<Long, ISwitch> connectedSwitches;
82 // Track connected switches via ChannelID. Whenever the message
83 private ConcurrentHashMap<Integer, IEnhancedSwitch> channelIDToSwitchMap;
85 // only 1 message listener per OFType
86 private ConcurrentMap<OFType, IMessageListener> messageListeners;
88 // only 1 switch state listener
89 private ISwitchStateListener switchStateListener;
90 private AtomicInteger switchInstanceNumber;
93 private OFChannelHandler ofChannelHandler = null;
94 private ControllerServerBootstrap bootstrap = null;
96 private ThreadPoolExecutor execHandler = null;
98 private static final int SEND_BUFFER_SIZE = 1 * 1024 * 1024;
99 private static final int RECEIVE_BUFFER_SIZE = 1 * 1024 * 1024;
100 private static final int WRITE_BUFFER_LOW_WATERMARK = 32 * 1024;
101 private static final int WRITE_BUFFER_HIGH_WATERMARK = 64 * 1024;
102 private static final String CONTROLLER_HOST = null;
103 private static final int CONTROLLER_PORT = 6633;
105 private static final int OMATPE_CORE_POOL_SIZE = 200;
106 private static final int OMATPE_PER_CHANNEL_SIZE = 2 * 1048576;
107 private static final int OMATPE_POOL_WIDE_SIZE = 0; //1073741824;
108 private static final int OMATPE_THREAD_KEEP_ALIVE_IN_MILLISECONDS = 100;
109 private static final int EXPERIMENTAL_OMATPE_OBJECT_SIZE = 1000; // bytes
111 private HashedWheelTimer hashedWheelTimer = null;
113 // This executor would be used by individual switches to handle
114 // cases like Stats Request/Response or Sync* methods which sends request and
115 // waits via Future for responses. Please note that threads in this
116 // pool are shared across multiple threads. So, if all threads are busy,
117 // Socket IO thread would get blocked creating sharp decline in performance
118 // If possible TOTALLY avoid any thread usage which does network level
119 // request / response by making a thread in this pool wait for response
120 // Consider storing the Future reference against the "sent" request and
121 // fire-event to wake-up the same when response is received rather than making the
122 // sender thread getting into a "wait" mode. That would never scale
123 private ExecutorService executorService = null;
125 // IMPORTANT: DO NOT REDUCE THIS THREAD COUNT TO 0
126 // THIS THREAD COUNT WOULD BE USED FOR SOCKET-IO + FOLLOWING EXECUTION CHAIN
127 // Plugin + SAL + North-to-SAL + Egress (flow_provisioning)
128 private static final int WORKER_THREAD_COUNT = 4;
130 // This is a handy thread-pool if WORKER_THREAD_COUNT is not able to cope with
131 // Socket IO + Execution of the following handling chain
132 // Plugin + SAL + North-to-SAL + Egress (flow_provisioning)
133 private static final int EXECUTION_HANDLER_THREAD_POOL_SIZE = 0;
135 // This is the thread-pool which can be optionally used for
136 // building synchronous semantics for flow_mod and stats handling cycle
137 // Flow_Mod in synchronous model could involve FLOW_MOD + BARRIER_MSG
138 // sending and receiving with wait timeout for reply
139 // Stats handling in synchronous model could involve STATS_REQUEST + STATS_REPLY
140 // sending and receiving with wait timeout for reply
141 private static final int THREAD_POOL_SIZE_FOR_EGRESS_SYNC_MSGS = 30;
143 private TrafficStatisticsHandler statsHandler = null;
145 // Lock for locking messagelisteners list while escalating the switch
147 private ReentrantLock lock = new ReentrantLock();
149 private static final int FLUSH_BATCH_SIZE = 100;
151 //****************** IController Interafce Methods Begin ******************
154 public void addMessageListener(OFType type, IMessageListener listener) {
155 IMessageListener currentListener = this.messageListeners.get(type);
156 if (currentListener != null) {
157 logger.warn("{} is already listened by {}", type.toString(),
158 currentListener.toString());
160 this.messageListeners.put(type, listener);
161 logger.debug("{} is now listened by {}", type.toString(),
162 listener.toString());
167 public void removeMessageListener(OFType type, IMessageListener listener) {
168 IMessageListener currentListener = this.messageListeners.get(type);
169 if ((currentListener != null) && (currentListener == listener)) {
170 logger.debug("{} listener {} is Removed", type.toString(),
171 listener.toString());
172 this.messageListeners.remove(type);
178 public void addSwitchStateListener(ISwitchStateListener listener) {
179 if (this.switchStateListener != null) {
180 logger.warn("Switch events are already listened by {}",
181 this.switchStateListener.toString());
183 this.switchStateListener = listener;
184 logger.debug("Switch events are now listened by {}",
185 listener.toString());
190 public void removeSwitchStateListener(ISwitchStateListener listener) {
191 if ((this.switchStateListener != null)
192 && (this.switchStateListener == listener)) {
193 logger.debug("SwitchStateListener {} is Removed",
194 listener.toString());
195 this.switchStateListener = null;
201 public Map<Long, ISwitch> getSwitches() {
202 return this.connectedSwitches;
206 public ISwitch getSwitch(Long switchId) {
207 return this.connectedSwitches.get(switchId);
210 //****************** IController Interafce Methods End ******************
214 //****************** Dependency-manager callbacks Begin ******************
216 * Function called by the dependency manager when all the required
217 * dependencies are satisfied
221 logger.debug("Initializing!");
222 this.connectedSwitches = new ConcurrentHashMap<Long, ISwitch>();
223 this.channelIDToSwitchMap = new ConcurrentHashMap<Integer, IEnhancedSwitch>();
224 this.messageListeners = new ConcurrentHashMap<OFType, IMessageListener>();
225 this.switchStateListener = null;
226 this.hashedWheelTimer = new HashedWheelTimer();
227 this.statsHandler = new TrafficStatisticsHandler(hashedWheelTimer);
228 this.switchInstanceNumber = new AtomicInteger(0);
229 this.factory = new BasicFactory();
230 this.bootstrap = new ControllerServerBootstrap(this);
231 this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE_FOR_EGRESS_SYNC_MSGS);
237 * Function called by dependency manager after "init ()" is called and after
238 * the services provided by the class are registered in the service registry
241 public void start() {
242 this.statsHandler.init();
243 logger.debug("Starting!");
244 bootstrap.startServer(WORKER_THREAD_COUNT,
253 * Function called by the dependency manager before the services exported by
254 * the component are unregistered, this will be followed by a "destroy ()"
259 for (Iterator<Entry<Integer, IEnhancedSwitch>> it = channelIDToSwitchMap.entrySet().iterator(); it
261 Entry<Integer, IEnhancedSwitch> entry = it.next();
262 ((EnhancedSwitchHandler) entry.getValue()).stop();
265 hashedWheelTimer.stop();
267 executorService.shutdown();
271 * Function called by the dependency manager when at least one dependency
272 * become unsatisfied or when the component is shutting down because for
273 * example bundle is being stopped.
276 public void destroy() {
278 //****************** Dependency-manager callbacks End ******************
282 public OFChannelHandler getChannelHandler(){
283 return new OFChannelHandler(this);
287 protected class OFChannelHandler extends IdleStateAwareChannelUpstreamHandler{
290 protected EnhancedController controller = null;
291 protected Channel channel = null;
294 public OFChannelHandler(EnhancedController controller){
295 this.controller = controller;
300 public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
302 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
303 msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
304 e.getChannel().write(msglist);
305 statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REQUEST_SENT);
309 public void channelConnected(ChannelHandlerContext ctx,
310 ChannelStateEvent e) throws Exception {
311 channel = e.getChannel();
312 logger.info("New switch connection from {}",
313 channel.getRemoteAddress());
315 Integer channelID = e.getChannel().getId();
317 IEnhancedSwitch switchHandler = new EnhancedSwitchHandler(controller,
318 channelID, channel, hashedWheelTimer, executorService, statsHandler);
319 switchHandler.startHandler();
320 channelIDToSwitchMap.put(channelID, switchHandler);
321 statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONNECTED_SWITCHES);
326 public void channelDisconnected(ChannelHandlerContext ctx,
327 ChannelStateEvent e) throws Exception {
328 // when SwitchHandler.shutDownHandler is called, Controller would
329 // get the feedback via switchDeleted method. So that both SwitchHandler and
330 // controller both release resources of the switch concerned
332 Integer channelID = e.getChannel().getId();
333 IEnhancedSwitch switchHandler = channelIDToSwitchMap.get(channelID);
334 if (switchHandler != null){
335 switchHandler.shutDownHandler();
337 statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.DISCONNECTED_SWITCHES);
342 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
345 EnhancedSwitchHandler sw = null;
347 if (e.getCause() instanceof ReadTimeoutException) {
349 logger.error("Disconnecting switch {} due to read timeout",
350 e.getChannel().getId(), e.getCause().getMessage());
351 ctx.getChannel().close();
352 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
355 } else if (e.getCause() instanceof HandshakeTimeoutException) {
356 logger.error("Disconnecting switch {}: failed to complete handshake",
357 e.getChannel().getId());
358 ctx.getChannel().close();
359 channelIDToSwitchMap.remove(e.getChannel().getId());
361 } else if (e.getCause() instanceof ClosedChannelException) {
362 logger.warn("Channel for sw {} already closed Error : {}",
363 e.getChannel().getId(), e.getCause().getMessage());
364 ctx.getChannel().close();
365 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
367 } else if (e.getCause() instanceof IOException) {
368 logger.error("Disconnecting switch {} due to IO Error: {}",
369 e.getChannel().getId(), e.getCause().getMessage());
370 ctx.getChannel().close();
371 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
374 } else if (e.getCause() instanceof SwitchStateException) {
375 logger.error("Disconnecting switch {} due to switch state error: {}",
376 e.getChannel().getId(), e.getCause().getMessage());
377 ctx.getChannel().close();
378 channelIDToSwitchMap.remove(e.getChannel().getId());
380 } else if (e.getCause() instanceof MessageParseException) {
381 logger.error("Disconnecting switch {} due to message parse error Error : {}",
382 e.getChannel().getId(), e.getCause().getMessage());
383 ctx.getChannel().close();
384 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
386 } else if (e.getCause() instanceof RejectedExecutionException) {
387 logger.warn("Could not process message: queue full");
388 ctx.getChannel().close();
389 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
392 logger.error("Error while processing message from switch {} Error : {}",
393 e.getChannel().getId(), e.getCause().getMessage());
394 e.getCause().printStackTrace();
395 ctx.getChannel().close();
396 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
400 statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.EXCEPTION_CAUGHT);
404 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
406 Integer messageChannelId = e.getChannel().getId();
407 IEnhancedSwitch swHan = (EnhancedSwitchHandler)channelIDToSwitchMap.get(messageChannelId);
409 if (e.getMessage() instanceof List) {
410 //@SuppressWarnings("unchecked")
411 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
412 if (msglist != null){ // this check actually brought down rate to some extent - weird !!!
413 for (OFMessage ofm : msglist) {
416 // Do the actual packet processing
417 processOFMessage(ofm, messageChannelId);
419 catch (Exception ex) {
420 // We are the last handler in the stream, so run the
421 // exception through the channel again by passing in
423 Channels.fireExceptionCaught(ctx.getChannel(), ex);
429 // Flush all flow-mods/packet-out/stats generated from this "train"
430 swHan.flushBufferedMessages();
432 statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.MESSAGE_RECEIVED);
438 public void processOFMessage(OFMessage ofm, Integer channelID){
439 IEnhancedSwitch switchHandler = (IEnhancedSwitch) channelIDToSwitchMap.get(channelID);
440 statsHandler.countForEntitySimpleMeasurement(channelID, TrafficStatisticsHandler.ENTITY_COUNTER_RCV_MSG);
441 if (switchHandler != null){
442 switchHandler.handleMessage(ofm);
450 protected class ControllerServerBootstrap{
452 private int workerThreads = 0;
453 private EnhancedController controller = null;
455 public ControllerServerBootstrap(EnhancedController controller){
456 this.controller = controller;
460 public void startServer(int numWorkerThreads, String openFlowHost, int openFlowPort, OFChannelHandler ofchan){
461 this.workerThreads = numWorkerThreads;
463 final ServerBootstrap bootstrap = createServerBootStrap();
465 bootstrap.setOption("reuseAddr", true);
466 bootstrap.setOption("child.keepAlive", true);
467 bootstrap.setOption("child.tcpNoDelay", true);
468 bootstrap.setOption("child.receiveBufferSize", EnhancedController.RECEIVE_BUFFER_SIZE);
469 bootstrap.setOption("child.sendBufferSize", EnhancedController.SEND_BUFFER_SIZE);
471 // better to have an receive buffer predictor
472 //bootstrap.setOption("receiveBufferSizePredictorFactory",
473 // new AdaptiveReceiveBufferSizePredictorFactory());
474 //if the server is sending 1000 messages per sec, optimum write buffer water marks will
475 //prevent unnecessary throttling, Check NioSocketChannelConfig doc
476 //bootstrap.setOption("writeBufferLowWaterMark", WRITE_BUFFER_LOW_WATERMARK);
477 //bootstrap.setOption("writeBufferHighWaterMark", WRITE_BUFFER_HIGH_WATERMARK);
479 // TODO: IMPORTANT: If the threadpool is supplied as null, ExecutionHandler would
480 // not be present in pipeline. If the load increases and ordering is required ,
481 // use OrderedMemoryAwareThreadPoolExecutor as argument instead of null
484 execHandler = new OrderedMemoryAwareThreadPoolExecutor(
485 OMATPE_CORE_POOL_SIZE,
486 OMATPE_PER_CHANNEL_SIZE,
487 OMATPE_POOL_WIDE_SIZE,
488 OMATPE_THREAD_KEEP_ALIVE_IN_MILLISECONDS,
489 TimeUnit.MILLISECONDS,
490 new ObjectSizeEstimator() {
493 public int estimateSize(Object o) {
497 Executors.defaultThreadFactory()); */
499 execHandler = new OrderedMemoryAwareThreadPoolExecutor(
500 OMATPE_CORE_POOL_SIZE,
501 OMATPE_PER_CHANNEL_SIZE,
502 OMATPE_POOL_WIDE_SIZE,
503 OMATPE_THREAD_KEEP_ALIVE_IN_MILLISECONDS,
504 TimeUnit.MILLISECONDS);
508 ChannelPipelineFactory pfact =
509 new OpenflowPipelineFactory(controller, execHandler);
510 bootstrap.setPipelineFactory(pfact);
511 InetSocketAddress sa =
512 (openFlowHost == null)
513 ? new InetSocketAddress(openFlowPort)
514 : new InetSocketAddress(openFlowHost, openFlowPort);
515 final ChannelGroup cg = new DefaultChannelGroup();
516 cg.add(bootstrap.bind(sa));
519 } catch (Exception e) {
520 throw new RuntimeException(e);
525 private ServerBootstrap createServerBootStrap() {
526 if (workerThreads == 0) {
527 return new ServerBootstrap(
528 new NioServerSocketChannelFactory(
529 Executors.newCachedThreadPool(),
530 Executors.newCachedThreadPool()));
532 return new ServerBootstrap(
533 new NioServerSocketChannelFactory(
534 Executors.newCachedThreadPool(),
535 Executors.newCachedThreadPool(), workerThreads));
545 * Method called by SwitchHandler once the handshake state is completed
549 public void switchAdded(SwitchEvent switchEv, Integer switchChannelID){
551 ISwitch sw = switchEv.getSwitch();
552 Long switchId = sw.getId();
554 connectedSwitches.put(switchId, sw);
555 statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONNECTED_SWITCHES);
557 logger.info("Switch with DPID : {} connected ", switchId);
559 notifySwitchAdded(sw);
564 * Method called by SwitchHandler switch is disconnected
569 public void switchDeleted(SwitchEvent switchEv, Integer switchChannelID){
570 ISwitch sw = switchEv.getSwitch();
571 disconnectSwitch(sw, switchChannelID);
576 * Method called by SwitchHandler when it encounters any errors
582 public void switchError(SwitchEvent switchEv, Integer switchChannelID){
587 public void switchMessage(SwitchEvent switchEv, Integer switchChannelID){
592 OFMessage msg = switchEv.getMsg();
593 ISwitch sw = switchEv.getSwitch();
597 IMessageListener listener = messageListeners
599 if (listener != null) {
600 //logger.debug("delegating to msg-receiver");
601 //startTime = System.nanoTime();
602 listener.receive(sw, msg);
603 //endTime = System.nanoTime();
604 //this.statsHandler.reportPacketInProcessingTime(endTime - startTime);
613 public void disconnectSwitch(ISwitch sw, Integer switchChannelID){
615 if (((EnhancedSwitchHandler) sw).isOperational()) {
618 this.connectedSwitches.remove(sid);
619 this.channelIDToSwitchMap.remove(switchChannelID);
620 notifySwitchDeleted(sw);
622 //((EnhancedSwitchHandler) sw).stop();
623 logger.info("Switch with DPID {} disconnected", sid);
628 private void notifySwitchAdded(ISwitch sw) {
629 if (switchStateListener != null) {
630 switchStateListener.switchAdded(sw);
634 private void notifySwitchDeleted(ISwitch sw) {
635 if (switchStateListener != null) {
636 switchStateListener.switchDeleted(sw);
641 public Status disconnect(Node node) {
642 ISwitch sw = getSwitch((Long)node.getID());
644 if (sw instanceof EnhancedSwitchHandler) {
645 EnhancedSwitchHandler eSw = (EnhancedSwitchHandler)sw;
646 disconnectSwitch(sw, eSw.getSwitchChannelID());
649 return new Status(StatusCode.SUCCESS);
653 public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
658 * View Change notification
660 public void notifyClusterViewChanged() {
661 for (ISwitch sw : connectedSwitches.values()) {
662 notifySwitchAdded(sw);
667 * Node Disconnected from the node's master controller.
670 public void notifyNodeDisconnectFromMaster(Node node) {
671 ISwitch sw = connectedSwitches.get((Long)node.getID());
672 if (sw != null) notifySwitchAdded(sw);