+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
-
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
-import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
-import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
-import org.openflow.protocol.OFBarrierReply;
-import org.openflow.protocol.OFBarrierRequest;
-import org.openflow.protocol.OFEchoReply;
-import org.openflow.protocol.OFEchoRequest;
-import org.openflow.protocol.OFError;
-import org.openflow.protocol.OFFeaturesReply;
-import org.openflow.protocol.OFFlowMod;
-import org.openflow.protocol.OFGetConfigReply;
-import org.openflow.protocol.OFMatch;
-import org.openflow.protocol.OFMessage;
-import org.openflow.protocol.OFPhysicalPort;
-import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
-import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
-import org.openflow.protocol.OFPhysicalPort.OFPortState;
-import org.openflow.protocol.OFPort;
-import org.openflow.protocol.OFPortStatus;
-import org.openflow.protocol.OFPortStatus.OFPortReason;
-import org.openflow.protocol.OFSetConfig;
-import org.openflow.protocol.OFStatisticsReply;
-import org.openflow.protocol.OFStatisticsRequest;
-import org.openflow.protocol.OFType;
-import org.openflow.protocol.factory.BasicFactory;
-import org.openflow.util.HexString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class SwitchHandler implements ISwitch {
- private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class);
- private static final int SWITCH_LIVENESS_TIMER = 5000;
- private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
- private final int MESSAGE_RESPONSE_TIMER = 2000;
-
- private final String instanceName;
- private final ISwitch thisISwitch;
- private final IController core;
- private Long sid;
- private Integer buffers;
- private Integer capabilities;
- private Byte tables;
- private Integer actions;
- private Selector selector;
- private final SocketChannel socket;
- private final BasicFactory factory;
- private final AtomicInteger xid;
- private SwitchState state;
- private Timer periodicTimer;
- private final Map<Short, OFPhysicalPort> physicalPorts;
- private final Map<Short, Integer> portBandwidth;
- private final Date connectedDate;
- private Long lastMsgReceivedTimeStamp;
- private Boolean probeSent;
- private final ExecutorService executor;
- private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
- private boolean running;
- private IMessageReadWrite msgReadWriteService;
- private Thread switchHandlerThread;
- private Integer responseTimerValue;
- private PriorityBlockingQueue<PriorityMessage> transmitQ;
- private Thread transmitThread;
-
- private enum SwitchState {
- NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3);
-
- private int value;
-
- private SwitchState(int value) {
- this.value = value;
- }
-
- @SuppressWarnings("unused")
- public int value() {
- return this.value;
- }
- }
-
- public SwitchHandler(Controller core, SocketChannel sc, String name) {
- this.instanceName = name;
- this.thisISwitch = this;
- this.sid = (long) 0;
- this.buffers = (int) 0;
- this.capabilities = (int) 0;
- this.tables = (byte) 0;
- this.actions = (int) 0;
- this.core = core;
- this.socket = sc;
- this.factory = new BasicFactory();
- this.connectedDate = new Date();
- this.lastMsgReceivedTimeStamp = connectedDate.getTime();
- this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
- this.portBandwidth = new HashMap<Short, Integer>();
- this.state = SwitchState.NON_OPERATIONAL;
- this.probeSent = false;
- this.xid = new AtomicInteger(this.socket.hashCode());
- this.periodicTimer = null;
- this.executor = Executors.newFixedThreadPool(4);
- this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
- this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
- String rTimer = System.getProperty("of.messageResponseTimer");
- if (rTimer != null) {
- try {
- responseTimerValue = Integer.decode(rTimer);
- } catch (NumberFormatException e) {
- logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER);
- }
- }
- }
-
- public void start() {
- try {
- startTransmitThread();
- setupCommChannel();
- sendFirstHello();
- startHandlerThread();
- } catch (Exception e) {
- reportError(e);
- }
- }
-
- private void startHandlerThread() {
- switchHandlerThread = new Thread(new Runnable() {
- @Override
- public void run() {
- running = true;
- while (running) {
- try {
- // wait for an incoming connection
- selector.select(0);
- Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
- while (selectedKeys.hasNext()) {
- SelectionKey skey = selectedKeys.next();
- selectedKeys.remove();
- if (skey.isValid() && skey.isWritable()) {
- resumeSend();
- }
- if (skey.isValid() && skey.isReadable()) {
- handleMessages();
- }
- }
- } catch (Exception e) {
- reportError(e);
- }
- }
- }
- }, instanceName);
- switchHandlerThread.start();
- }
-
- private void stopInternal() {
- logger.debug("{} receives stop signal",
- (isOperational() ? HexString.toHexString(sid) : "unknown"));
- running = false;
- cancelSwitchTimer();
- try {
- selector.wakeup();
- selector.close();
- } catch (Exception e) {
- }
- try {
- socket.close();
- } catch (Exception e) {
- }
- try {
- msgReadWriteService.stop();
- } catch (Exception e) {
- }
- logger.debug("executor shutdown now");
- executor.shutdownNow();
-
- msgReadWriteService = null;
- }
-
- public void stop() {
- stopInternal();
-
- if (switchHandlerThread != null) {
- switchHandlerThread.interrupt();
- }
- if (transmitThread != null) {
- transmitThread.interrupt();
- }
- }
-
- @Override
- public int getNextXid() {
- return this.xid.incrementAndGet();
- }
-
- /**
- * This method puts the message in an outgoing priority queue with normal
- * priority. It will be served after high priority messages. The method
- * should be used for non-critical messages such as statistics request,
- * discovery packets, etc. An unique XID is generated automatically and
- * inserted into the message.
- *
- * @param msg
- * The OF message to be sent
- * @return The XID used
- */
- @Override
- public Integer asyncSend(OFMessage msg) {
- return asyncSend(msg, getNextXid());
- }
-
- private Object syncSend(OFMessage msg, int xid) {
- return syncMessageInternal(msg, xid, true);
- }
-
- /**
- * This method puts the message in an outgoing priority queue with normal
- * priority. It will be served after high priority messages. The method
- * should be used for non-critical messages such as statistics request,
- * discovery packets, etc. The specified XID is inserted into the message.
- *
- * @param msg
- * The OF message to be Sent
- * @param xid
- * The XID to be used in the message
- * @return The XID used
- */
- @Override
- public Integer asyncSend(OFMessage msg, int xid) {
- msg.setXid(xid);
- if (transmitQ != null) {
- transmitQ.add(new PriorityMessage(msg, 0));
- }
- return xid;
- }
-
- /**
- * This method puts the message in an outgoing priority queue with high
- * priority. It will be served first before normal priority messages. The
- * method should be used for critical messages such as hello, echo reply
- * etc. An unique XID is generated automatically and inserted into the
- * message.
- *
- * @param msg
- * The OF message to be sent
- * @return The XID used
- */
- @Override
- public Integer asyncFastSend(OFMessage msg) {
- return asyncFastSend(msg, getNextXid());
- }
-
- /**
- * This method puts the message in an outgoing priority queue with high
- * priority. It will be served first before normal priority messages. The
- * method should be used for critical messages such as hello, echo reply
- * etc. The specified XID is inserted into the message.
- *
- * @param msg
- * The OF message to be sent
- * @return The XID used
- */
- @Override
- public Integer asyncFastSend(OFMessage msg, int xid) {
- msg.setXid(xid);
- if (transmitQ != null) {
- transmitQ.add(new PriorityMessage(msg, 1));
- }
- return xid;
- }
-
- public void resumeSend() {
- try {
- if (msgReadWriteService != null) {
- msgReadWriteService.resumeSend();
- }
- } catch (Exception e) {
- reportError(e);
- }
- }
-
- /**
- * This method bypasses the transmit queue and sends the message over the
- * socket directly. If the input xid is not null, the specified xid is
- * inserted into the message. Otherwise, an unique xid is generated
- * automatically and inserted into the message.
- *
- * @param msg
- * Message to be sent
- * @param xid
- * Message xid
- */
- private void asyncSendNow(OFMessage msg, Integer xid) {
- if (xid == null) {
- xid = getNextXid();
- }
- msg.setXid(xid);
-
- asyncSendNow(msg);
- }
-
- /**
- * This method bypasses the transmit queue and sends the message over the
- * socket directly.
- *
- * @param msg
- * Message to be sent
- */
- private void asyncSendNow(OFMessage msg) {
- if (msgReadWriteService == null) {
- logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg);
- return;
- }
-
- try {
- msgReadWriteService.asyncSend(msg);
- } catch (Exception e) {
- reportError(e);
- }
- }
-
- public void handleMessages() {
- List<OFMessage> msgs = null;
-
- try {
- if (msgReadWriteService != null) {
- msgs = msgReadWriteService.readMessages();
- }
- } catch (Exception e) {
- reportError(e);
- }
-
- if (msgs == null) {
- return;
- }
- for (OFMessage msg : msgs) {
- logger.trace("Message received: {}", msg);
- this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
- OFType type = msg.getType();
- switch (type) {
- case HELLO:
- sendFeaturesRequest();
- break;
- case ECHO_REQUEST:
- OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
-
- byte []payload = ((OFEchoRequest)msg).getPayload();
- if (payload != null && payload.length != 0 ) {
- // the response must have the same payload as the request
- echoReply.setPayload(payload);
- echoReply.setLength( (short)(echoReply.getLength() + payload.length) );
- }
-
- // respond immediately
- asyncSendNow(echoReply, msg.getXid());
-
- // send features request if not sent yet
- sendFeaturesRequest();
- break;
- case ECHO_REPLY:
- this.probeSent = false;
- break;
- case FEATURES_REPLY:
- processFeaturesReply((OFFeaturesReply) msg);
- break;
- case GET_CONFIG_REPLY:
- // make sure that the switch can send the whole packet to the
- // controller
- if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
- this.state = SwitchState.OPERATIONAL;
- }
- break;
- case BARRIER_REPLY:
- processBarrierReply((OFBarrierReply) msg);
- break;
- case ERROR:
- processErrorReply((OFError) msg);
- break;
- case PORT_STATUS:
- processPortStatusMsg((OFPortStatus) msg);
- break;
- case STATS_REPLY:
- processStatsReply((OFStatisticsReply) msg);
- break;
- case PACKET_IN:
- break;
- default:
- break;
- } // end of switch
- if (isOperational()) {
- ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
- }
- } // end of for
- }
-
- private void processPortStatusMsg(OFPortStatus msg) {
- OFPhysicalPort port = msg.getDesc();
- if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
- updatePhysicalPort(port);
- } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
- updatePhysicalPort(port);
- } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) {
- deletePhysicalPort(port);
- }
-
- }
-
- private void startSwitchTimer() {
- this.periodicTimer = new Timer();
- this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
- @Override
- public void run() {
- try {
- Long now = System.currentTimeMillis();
- if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
- if (probeSent) {
- // switch failed to respond to our probe, consider
- // it down
- logger.warn("{} sid {} is idle for too long, disconnect", socket.socket()
- .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown"
- : HexString.toHexString(sid));
- reportSwitchStateChange(false);
- } else {
- // send a probe to see if the switch is still alive
- logger.debug("Send idle probe (Echo Request) to {}", this);
- probeSent = true;
- OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST);
- asyncFastSend(echo);
- }
- } else {
- if (state == SwitchState.WAIT_FEATURES_REPLY) {
- // send another features request
- OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST);
- asyncFastSend(request);
- } else {
- if (state == SwitchState.WAIT_CONFIG_REPLY) {
- // send another config request
- OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
- config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
- asyncFastSend(config);
- OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
- asyncFastSend(getConfig);
- }
- }
- }
- } catch (Exception e) {
- reportError(e);
- }
- }
- }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
- }
-
- private void cancelSwitchTimer() {
- if (this.periodicTimer != null) {
- this.periodicTimer.cancel();
- }
- }
-
- private void reportError(Exception e) {
- if (!running) {
- logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(),
- (isOperational() ? HexString.toHexString(sid) : "unknown"));
- return;
- }
- logger.debug("Caught exception: ", e);
-
- // notify core of this error event and disconnect the switch
- ((Controller) core).takeSwitchEventError(this);
-
- // clean up some internal states immediately
- stopInternal();
- }
-
- private void reportSwitchStateChange(boolean added) {
- if (added) {
- ((Controller) core).takeSwitchEventAdd(this);
- } else {
- ((Controller) core).takeSwitchEventDelete(this);
- }
- }
-
- @Override
- public Long getId() {
- return this.sid;
- }
-
- private void sendFeaturesRequest() {
- if (!isOperational() && (this.state != SwitchState.WAIT_FEATURES_REPLY)) {
- // send feature request
- OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST);
- asyncFastSend(featureRequest);
- this.state = SwitchState.WAIT_FEATURES_REPLY;
- startSwitchTimer();
- }
- }
-
- private void processFeaturesReply(OFFeaturesReply reply) {
- if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
- this.sid = reply.getDatapathId();
- this.buffers = reply.getBuffers();
- this.capabilities = reply.getCapabilities();
- this.tables = reply.getTables();
- this.actions = reply.getActions();
- // notify core of this error event
- for (OFPhysicalPort port : reply.getPorts()) {
- updatePhysicalPort(port);
- }
- // config the switch to send full data packet
- OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
- config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
- asyncFastSend(config);
- // send config request to make sure the switch can handle the set
- // config
- OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
- asyncFastSend(getConfig);
- this.state = SwitchState.WAIT_CONFIG_REPLY;
- // inform core that a new switch is now operational
- reportSwitchStateChange(true);
- }
- }
-
- private void updatePhysicalPort(OFPhysicalPort port) {
- Short portNumber = port.getPortNumber();
- physicalPorts.put(portNumber, port);
- portBandwidth
- .put(portNumber,
- port.getCurrentFeatures()
- & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue()
- | OFPortFeatures.OFPPF_100MB_FD.getValue()
- | OFPortFeatures.OFPPF_100MB_HD.getValue()
- | OFPortFeatures.OFPPF_1GB_FD.getValue()
- | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD
- .getValue()));
- }
-
- private void deletePhysicalPort(OFPhysicalPort port) {
- Short portNumber = port.getPortNumber();
- physicalPorts.remove(portNumber);
- portBandwidth.remove(portNumber);
- }
-
- @Override
- public boolean isOperational() {
- return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
- }
-
- @Override
- public String toString() {
- try {
- return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString
- .toHexString(this.sid) : "unknown"));
- } catch (Exception e) {
- return (isOperational() ? HexString.toHexString(this.sid) : "unknown");
- }
-
- }
-
- @Override
- public Date getConnectedDate() {
- return this.connectedDate;
- }
-
- public String getInstanceName() {
- return instanceName;
- }
-
- @Override
- public Object getStatistics(OFStatisticsRequest req) {
- int xid = getNextXid();
- StatisticsCollector worker = new StatisticsCollector(this, xid, req);
- messageWaitingDone.put(xid, worker);
- Future<Object> submit;
- Object result = null;
- try {
- submit = executor.submit(worker);
- } catch (RejectedExecutionException re) {
- messageWaitingDone.remove(xid);
- return result;
- }
- try {
- result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
- return result;
- } catch (Exception e) {
- logger.warn("Timeout while waiting for {} replies from {}",
- req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown"));
- result = null; // to indicate timeout has occurred
- worker.wakeup();
- return result;
- }
- }
-
- @Override
- public Object syncSend(OFMessage msg) {
- if (!running) {
- logger.debug("Switch is going down, ignore syncSend");
- return null;
- }
- int xid = getNextXid();
- return syncSend(msg, xid);
- }
-
- /*
- * Either a BarrierReply or a OFError is received. If this is a reply for an
- * outstanding sync message, wake up associated task so that it can continue
- */
- private void processBarrierReply(OFBarrierReply msg) {
- Integer xid = msg.getXid();
- SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid);
- if (worker == null) {
- return;
- }
- worker.wakeup();
- }
-
- private void processErrorReply(OFError errorMsg) {
- OFMessage offendingMsg = errorMsg.getOffendingMsg();
- Integer xid;
- if (offendingMsg != null) {
- xid = offendingMsg.getXid();
- } else {
- xid = errorMsg.getXid();
- }
- /*
- * the error can be a reply to a synchronous message or to a statistic
- * request message
- */
- Callable<?> worker = messageWaitingDone.remove(xid);
- if (worker == null) {
- return;
- }
- if (worker instanceof SynchronousMessage) {
- ((SynchronousMessage) worker).wakeup(errorMsg);
- } else {
- ((StatisticsCollector) worker).wakeup(errorMsg);
- }
- }
-
- private void processStatsReply(OFStatisticsReply reply) {
- Integer xid = reply.getXid();
- StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid);
- if (worker == null) {
- return;
- }
- if (worker.collect(reply)) {
- // if all the stats records are received (collect() returns true)
- // then we are done.
- messageWaitingDone.remove(xid);
- worker.wakeup();
- }
- }
-
- @Override
- public Map<Short, OFPhysicalPort> getPhysicalPorts() {
- return this.physicalPorts;
- }
-
- @Override
- public OFPhysicalPort getPhysicalPort(Short portNumber) {
- return this.physicalPorts.get(portNumber);
- }
-
- @Override
- public Integer getPortBandwidth(Short portNumber) {
- return this.portBandwidth.get(portNumber);
- }
-
- @Override
- public Set<Short> getPorts() {
- return this.physicalPorts.keySet();
- }
-
- @Override
- public Byte getTables() {
- return this.tables;
- }
-
- @Override
- public Integer getActions() {
- return this.actions;
- }
-
- @Override
- public Integer getCapabilities() {
- return this.capabilities;
- }
-
- @Override
- public Integer getBuffers() {
- return this.buffers;
- }
-
- @Override
- public boolean isPortEnabled(short portNumber) {
- return isPortEnabled(physicalPorts.get(portNumber));
- }
-
- @Override
- public boolean isPortEnabled(OFPhysicalPort port) {
- if (port == null) {
- return false;
- }
- int portConfig = port.getConfig();
- int portState = port.getState();
- if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
- return false;
- }
- if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
- return false;
- }
- if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) {
- return false;
- }
- return true;
- }
-
- @Override
- public List<OFPhysicalPort> getEnabledPorts() {
- List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
- synchronized (this.physicalPorts) {
- for (OFPhysicalPort port : physicalPorts.values()) {
- if (isPortEnabled(port)) {
- result.add(port);
- }
- }
- }
- return result;
- }
-
- /*
- * Transmit thread polls the message out of the priority queue and invokes
- * messaging service to transmit it over the socket channel
- */
- class PriorityMessageTransmit implements Runnable {
- @Override
- public void run() {
- running = true;
- while (running) {
- try {
- PriorityMessage pmsg = transmitQ.take();
- msgReadWriteService.asyncSend(pmsg.msg);
- /*
- * If syncReply is set to true, wait for the response back.
- */
- if (pmsg.syncReply) {
- syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
- }
- } catch (InterruptedException ie) {
- reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
- } catch (Exception e) {
- reportError(e);
- }
- }
- transmitQ = null;
- }
- }
-
- /*
- * Setup and start the transmit thread
- */
- private void startTransmitThread() {
- this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
- @Override
- public int compare(PriorityMessage p1, PriorityMessage p2) {
- if (p2.priority != p1.priority) {
- return p2.priority - p1.priority;
- } else {
- return (p2.seqNum < p1.seqNum) ? 1 : -1;
- }
- }
- });
- this.transmitThread = new Thread(new PriorityMessageTransmit());
- this.transmitThread.start();
- }
-
- /*
- * Setup communication services
- */
- private void setupCommChannel() throws Exception {
- this.selector = SelectorProvider.provider().openSelector();
- this.socket.configureBlocking(false);
- this.socket.socket().setTcpNoDelay(true);
- this.msgReadWriteService = getMessageReadWriteService();
- }
-
- private void sendFirstHello() {
- try {
- OFMessage msg = factory.getMessage(OFType.HELLO);
- asyncFastSend(msg);
- } catch (Exception e) {
- reportError(e);
- }
- }
-
- private IMessageReadWrite getMessageReadWriteService() throws Exception {
- String str = System.getProperty("secureChannelEnabled");
- return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
- selector) : new MessageReadWriteService(socket, selector);
- }
-
- /**
- * Send Barrier message synchronously. The caller will be blocked until the
- * Barrier reply is received.
- */
- @Override
- public Object syncSendBarrierMessage() {
- OFBarrierRequest barrierMsg = new OFBarrierRequest();
- return syncSend(barrierMsg);
- }
-
- /**
- * Send Barrier message asynchronously. The caller is not blocked. The
- * Barrier message will be sent in a transmit thread which will be blocked
- * until the Barrier reply is received.
- */
- @Override
- public Object asyncSendBarrierMessage() {
- if (transmitQ == null) {
- return Boolean.FALSE;
- }
-
- OFBarrierRequest barrierMsg = new OFBarrierRequest();
- int xid = getNextXid();
-
- barrierMsg.setXid(xid);
- transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
-
- return Boolean.TRUE;
- }
-
- /**
- * This method returns the switch liveness timeout value. If controller did
- * not receive any message from the switch for such a long period,
- * controller will tear down the connection to the switch.
- *
- * @return The timeout value
- */
- private static int getSwitchLivenessTimeout() {
- String timeout = System.getProperty("of.switchLivenessTimeout");
- int rv = 60500;
-
- try {
- if (timeout != null) {
- rv = Integer.parseInt(timeout);
- }
- } catch (Exception e) {
- }
-
- return rv;
- }
-
- /**
- * This method performs synchronous operations for a given message. If
- * syncRequest is set to true, the message will be sent out followed by a
- * Barrier request message. Then it's blocked until the Barrier rely arrives
- * or timeout. If syncRequest is false, it simply skips the message send and
- * just waits for the response back.
- *
- * @param msg
- * Message to be sent
- * @param xid
- * Message XID
- * @param request
- * If set to true, the message the message will be sent out
- * followed by a Barrier request message. If set to false, it
- * simply skips the sending and just waits for the Barrier reply.
- * @return the result
- */
- private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
- SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
- messageWaitingDone.put(xid, worker);
- Object result = null;
- Boolean status = false;
- Future<Object> submit;
- try {
- submit = executor.submit(worker);
- } catch (RejectedExecutionException re) {
- messageWaitingDone.remove(xid);
- return result;
- }
- try {
- result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
- messageWaitingDone.remove(xid);
- if (result == null) {
- // if result is null, then it means the switch can handle this
- // message successfully
- // convert the result into a Boolean with value true
- status = true;
- // logger.debug("Successfully send " +
- // msg.getType().toString());
- result = status;
- } else {
- // if result is not null, this means the switch can't handle
- // this message
- // the result if OFError already
- if (logger.isDebugEnabled()) {
- logger.debug("Send {} failed --> {}", msg.getType(), (result));
- }
- }
- return result;
- } catch (Exception e) {
- logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
- // convert the result into a Boolean with value false
- status = false;
- result = status;
- worker.wakeup();
- return result;
- }
- }
-
- @Override
- public void deleteAllFlows() {
- logger.trace("deleteAllFlows on switch {}", HexString.toHexString(this.sid));
- OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
- OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
- flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
- .setLength((short) OFFlowMod.MINIMUM_LENGTH);
- asyncFastSend(flowMod);
- }
-}