2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
11 import java.nio.channels.SelectionKey;
12 import java.nio.channels.Selector;
13 import java.nio.channels.SocketChannel;
14 import java.nio.channels.spi.SelectorProvider;
15 import java.util.ArrayList;
16 import java.util.Comparator;
17 import java.util.Date;
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.List;
23 import java.util.Timer;
24 import java.util.TimerTask;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.PriorityBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
36 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
37 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
38 import org.openflow.protocol.OFBarrierReply;
39 import org.openflow.protocol.OFBarrierRequest;
40 import org.openflow.protocol.OFEchoReply;
41 import org.openflow.protocol.OFError;
42 import org.openflow.protocol.OFFeaturesReply;
43 import org.openflow.protocol.OFFlowMod;
44 import org.openflow.protocol.OFGetConfigReply;
45 import org.openflow.protocol.OFMatch;
46 import org.openflow.protocol.OFMessage;
47 import org.openflow.protocol.OFPhysicalPort;
48 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
49 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
50 import org.openflow.protocol.OFPhysicalPort.OFPortState;
51 import org.openflow.protocol.OFPort;
52 import org.openflow.protocol.OFPortStatus;
53 import org.openflow.protocol.OFPortStatus.OFPortReason;
54 import org.openflow.protocol.OFSetConfig;
55 import org.openflow.protocol.OFStatisticsReply;
56 import org.openflow.protocol.OFStatisticsRequest;
57 import org.openflow.protocol.OFType;
58 import org.openflow.protocol.factory.BasicFactory;
59 import org.openflow.util.HexString;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 public class SwitchHandler implements ISwitch {
64 private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class);
65 private static final int SWITCH_LIVENESS_TIMER = 5000;
66 private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
67 private final int MESSAGE_RESPONSE_TIMER = 2000;
69 private final String instanceName;
70 private final ISwitch thisISwitch;
71 private final IController core;
73 private Integer buffers;
74 private Integer capabilities;
76 private Integer actions;
77 private Selector selector;
78 private final SocketChannel socket;
79 private final BasicFactory factory;
80 private final AtomicInteger xid;
81 private SwitchState state;
82 private Timer periodicTimer;
83 private final Map<Short, OFPhysicalPort> physicalPorts;
84 private final Map<Short, Integer> portBandwidth;
85 private final Date connectedDate;
86 private Long lastMsgReceivedTimeStamp;
87 private Boolean probeSent;
88 private final ExecutorService executor;
89 private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
90 private boolean running;
91 private IMessageReadWrite msgReadWriteService;
92 private Thread switchHandlerThread;
93 private Integer responseTimerValue;
94 private PriorityBlockingQueue<PriorityMessage> transmitQ;
95 private Thread transmitThread;
97 private enum SwitchState {
98 NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3);
102 private SwitchState(int value) {
106 @SuppressWarnings("unused")
112 public SwitchHandler(Controller core, SocketChannel sc, String name) {
113 this.instanceName = name;
114 this.thisISwitch = this;
116 this.buffers = (int) 0;
117 this.capabilities = (int) 0;
118 this.tables = (byte) 0;
119 this.actions = (int) 0;
122 this.factory = new BasicFactory();
123 this.connectedDate = new Date();
124 this.lastMsgReceivedTimeStamp = connectedDate.getTime();
125 this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
126 this.portBandwidth = new HashMap<Short, Integer>();
127 this.state = SwitchState.NON_OPERATIONAL;
128 this.probeSent = false;
129 this.xid = new AtomicInteger(this.socket.hashCode());
130 this.periodicTimer = null;
131 this.executor = Executors.newFixedThreadPool(4);
132 this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
133 this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
134 String rTimer = System.getProperty("of.messageResponseTimer");
135 if (rTimer != null) {
137 responseTimerValue = Integer.decode(rTimer);
138 } catch (NumberFormatException e) {
139 logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER);
144 public void start() {
146 startTransmitThread();
149 startHandlerThread();
150 } catch (Exception e) {
155 private void startHandlerThread() {
156 switchHandlerThread = new Thread(new Runnable() {
162 // wait for an incoming connection
164 Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
165 while (selectedKeys.hasNext()) {
166 SelectionKey skey = selectedKeys.next();
167 selectedKeys.remove();
168 if (skey.isValid() && skey.isWritable()) {
171 if (skey.isValid() && skey.isReadable()) {
175 } catch (Exception e) {
181 switchHandlerThread.start();
184 private void stopInternal() {
185 logger.debug("{} receives stop signal",
186 (isOperational() ? HexString.toHexString(sid) : "unknown"));
192 } catch (Exception e) {
196 } catch (Exception e) {
199 msgReadWriteService.stop();
200 } catch (Exception e) {
202 logger.debug("executor shutdown now");
203 executor.shutdownNow();
205 msgReadWriteService = null;
211 if (switchHandlerThread != null) {
212 switchHandlerThread.interrupt();
214 if (transmitThread != null) {
215 transmitThread.interrupt();
220 public int getNextXid() {
221 return this.xid.incrementAndGet();
225 * This method puts the message in an outgoing priority queue with normal
226 * priority. It will be served after high priority messages. The method
227 * should be used for non-critical messages such as statistics request,
228 * discovery packets, etc. An unique XID is generated automatically and
229 * inserted into the message.
232 * The OF message to be sent
233 * @return The XID used
236 public Integer asyncSend(OFMessage msg) {
237 return asyncSend(msg, getNextXid());
240 private Object syncSend(OFMessage msg, int xid) {
241 return syncMessageInternal(msg, xid, true);
245 * This method puts the message in an outgoing priority queue with normal
246 * priority. It will be served after high priority messages. The method
247 * should be used for non-critical messages such as statistics request,
248 * discovery packets, etc. The specified XID is inserted into the message.
251 * The OF message to be Sent
253 * The XID to be used in the message
254 * @return The XID used
257 public Integer asyncSend(OFMessage msg, int xid) {
259 if (transmitQ != null) {
260 transmitQ.add(new PriorityMessage(msg, 0));
266 * This method puts the message in an outgoing priority queue with high
267 * priority. It will be served first before normal priority messages. The
268 * method should be used for critical messages such as hello, echo reply
269 * etc. An unique XID is generated automatically and inserted into the
273 * The OF message to be sent
274 * @return The XID used
277 public Integer asyncFastSend(OFMessage msg) {
278 return asyncFastSend(msg, getNextXid());
282 * This method puts the message in an outgoing priority queue with high
283 * priority. It will be served first before normal priority messages. The
284 * method should be used for critical messages such as hello, echo reply
285 * etc. The specified XID is inserted into the message.
288 * The OF message to be sent
289 * @return The XID used
292 public Integer asyncFastSend(OFMessage msg, int xid) {
294 if (transmitQ != null) {
295 transmitQ.add(new PriorityMessage(msg, 1));
300 public void resumeSend() {
302 if (msgReadWriteService != null) {
303 msgReadWriteService.resumeSend();
305 } catch (Exception e) {
311 * This method bypasses the transmit queue and sends the message over the
312 * socket directly. If the input xid is not null, the specified xid is
313 * inserted into the message. Otherwise, an unique xid is generated
314 * automatically and inserted into the message.
321 private void asyncSendNow(OFMessage msg, Integer xid) {
331 * This method bypasses the transmit queue and sends the message over the
337 private void asyncSendNow(OFMessage msg) {
338 if (msgReadWriteService == null) {
339 logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg);
344 msgReadWriteService.asyncSend(msg);
345 } catch (Exception e) {
350 public void handleMessages() {
351 List<OFMessage> msgs = null;
354 if (msgReadWriteService != null) {
355 msgs = msgReadWriteService.readMessages();
357 } catch (Exception e) {
362 logger.info("{} is down", this);
363 reportSwitchStateChange(false);
366 for (OFMessage msg : msgs) {
367 logger.trace("Message received: {}", msg);
368 this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
369 OFType type = msg.getType();
372 // send feature request
373 OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST);
374 asyncFastSend(featureRequest);
375 this.state = SwitchState.WAIT_FEATURES_REPLY;
379 OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
380 // respond immediately
381 asyncSendNow(echoReply, msg.getXid());
384 this.probeSent = false;
387 processFeaturesReply((OFFeaturesReply) msg);
389 case GET_CONFIG_REPLY:
390 // make sure that the switch can send the whole packet to the
392 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
393 this.state = SwitchState.OPERATIONAL;
397 processBarrierReply((OFBarrierReply) msg);
400 processErrorReply((OFError) msg);
403 processPortStatusMsg((OFPortStatus) msg);
406 processStatsReply((OFStatisticsReply) msg);
413 if (isOperational()) {
414 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
419 private void processPortStatusMsg(OFPortStatus msg) {
420 OFPhysicalPort port = msg.getDesc();
421 if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
422 updatePhysicalPort(port);
423 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
424 updatePhysicalPort(port);
425 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) {
426 deletePhysicalPort(port);
431 private void startSwitchTimer() {
432 this.periodicTimer = new Timer();
433 this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
437 Long now = System.currentTimeMillis();
438 if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
440 // switch failed to respond to our probe, consider
442 logger.warn("{} sid {} is idle for too long, disconnect", socket.socket()
443 .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown"
444 : HexString.toHexString(sid));
445 reportSwitchStateChange(false);
447 // send a probe to see if the switch is still alive
448 logger.debug("Send idle probe (Echo Request) to {}", this);
450 OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST);
454 if (state == SwitchState.WAIT_FEATURES_REPLY) {
455 // send another features request
456 OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST);
457 asyncFastSend(request);
459 if (state == SwitchState.WAIT_CONFIG_REPLY) {
460 // send another config request
461 OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
462 config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
463 asyncFastSend(config);
464 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
465 asyncFastSend(getConfig);
469 } catch (Exception e) {
473 }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
476 private void cancelSwitchTimer() {
477 if (this.periodicTimer != null) {
478 this.periodicTimer.cancel();
482 private void reportError(Exception e) {
484 logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(),
485 (isOperational() ? HexString.toHexString(sid) : "unknown"));
488 logger.debug("Caught exception: ", e);
490 // notify core of this error event and disconnect the switch
491 ((Controller) core).takeSwitchEventError(this);
493 // clean up some internal states immediately
497 private void reportSwitchStateChange(boolean added) {
499 ((Controller) core).takeSwitchEventAdd(this);
501 ((Controller) core).takeSwitchEventDelete(this);
506 public Long getId() {
510 private void processFeaturesReply(OFFeaturesReply reply) {
511 if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
512 this.sid = reply.getDatapathId();
513 this.buffers = reply.getBuffers();
514 this.capabilities = reply.getCapabilities();
515 this.tables = reply.getTables();
516 this.actions = reply.getActions();
517 // notify core of this error event
518 for (OFPhysicalPort port : reply.getPorts()) {
519 updatePhysicalPort(port);
521 // config the switch to send full data packet
522 OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
523 config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
524 asyncFastSend(config);
525 // send config request to make sure the switch can handle the set
527 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
528 asyncFastSend(getConfig);
529 this.state = SwitchState.WAIT_CONFIG_REPLY;
530 // inform core that a new switch is now operational
531 reportSwitchStateChange(true);
535 private void updatePhysicalPort(OFPhysicalPort port) {
536 Short portNumber = port.getPortNumber();
537 physicalPorts.put(portNumber, port);
540 port.getCurrentFeatures()
541 & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue()
542 | OFPortFeatures.OFPPF_100MB_FD.getValue()
543 | OFPortFeatures.OFPPF_100MB_HD.getValue()
544 | OFPortFeatures.OFPPF_1GB_FD.getValue()
545 | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD
549 private void deletePhysicalPort(OFPhysicalPort port) {
550 Short portNumber = port.getPortNumber();
551 physicalPorts.remove(portNumber);
552 portBandwidth.remove(portNumber);
556 public boolean isOperational() {
557 return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
561 public String toString() {
563 return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString
564 .toHexString(this.sid) : "unknown"));
565 } catch (Exception e) {
566 return (isOperational() ? HexString.toHexString(this.sid) : "unknown");
572 public Date getConnectedDate() {
573 return this.connectedDate;
576 public String getInstanceName() {
581 public Object getStatistics(OFStatisticsRequest req) {
582 int xid = getNextXid();
583 StatisticsCollector worker = new StatisticsCollector(this, xid, req);
584 messageWaitingDone.put(xid, worker);
585 Future<Object> submit;
586 Object result = null;
588 submit = executor.submit(worker);
589 } catch (RejectedExecutionException re) {
590 messageWaitingDone.remove(xid);
594 result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
596 } catch (Exception e) {
597 logger.warn("Timeout while waiting for {} replies from {}",
598 req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown"));
599 result = null; // to indicate timeout has occurred
606 public Object syncSend(OFMessage msg) {
608 logger.debug("Switch is going down, ignore syncSend");
611 int xid = getNextXid();
612 return syncSend(msg, xid);
616 * Either a BarrierReply or a OFError is received. If this is a reply for an
617 * outstanding sync message, wake up associated task so that it can continue
619 private void processBarrierReply(OFBarrierReply msg) {
620 Integer xid = msg.getXid();
621 SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid);
622 if (worker == null) {
628 private void processErrorReply(OFError errorMsg) {
629 OFMessage offendingMsg = errorMsg.getOffendingMsg();
631 if (offendingMsg != null) {
632 xid = offendingMsg.getXid();
634 xid = errorMsg.getXid();
637 * the error can be a reply to a synchronous message or to a statistic
640 Callable<?> worker = messageWaitingDone.remove(xid);
641 if (worker == null) {
644 if (worker instanceof SynchronousMessage) {
645 ((SynchronousMessage) worker).wakeup(errorMsg);
647 ((StatisticsCollector) worker).wakeup(errorMsg);
651 private void processStatsReply(OFStatisticsReply reply) {
652 Integer xid = reply.getXid();
653 StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid);
654 if (worker == null) {
657 if (worker.collect(reply)) {
658 // if all the stats records are received (collect() returns true)
660 messageWaitingDone.remove(xid);
666 public Map<Short, OFPhysicalPort> getPhysicalPorts() {
667 return this.physicalPorts;
671 public OFPhysicalPort getPhysicalPort(Short portNumber) {
672 return this.physicalPorts.get(portNumber);
676 public Integer getPortBandwidth(Short portNumber) {
677 return this.portBandwidth.get(portNumber);
681 public Set<Short> getPorts() {
682 return this.physicalPorts.keySet();
686 public Byte getTables() {
691 public Integer getActions() {
696 public Integer getCapabilities() {
697 return this.capabilities;
701 public Integer getBuffers() {
706 public boolean isPortEnabled(short portNumber) {
707 return isPortEnabled(physicalPorts.get(portNumber));
711 public boolean isPortEnabled(OFPhysicalPort port) {
715 int portConfig = port.getConfig();
716 int portState = port.getState();
717 if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
720 if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
723 if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) {
730 public List<OFPhysicalPort> getEnabledPorts() {
731 List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
732 synchronized (this.physicalPorts) {
733 for (OFPhysicalPort port : physicalPorts.values()) {
734 if (isPortEnabled(port)) {
743 * Transmit thread polls the message out of the priority queue and invokes
744 * messaging service to transmit it over the socket channel
746 class PriorityMessageTransmit implements Runnable {
752 PriorityMessage pmsg = transmitQ.take();
753 msgReadWriteService.asyncSend(pmsg.msg);
755 * If syncReply is set to true, wait for the response back.
757 if (pmsg.syncReply) {
758 syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
760 } catch (InterruptedException ie) {
761 reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
762 } catch (Exception e) {
771 * Setup and start the transmit thread
773 private void startTransmitThread() {
774 this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
776 public int compare(PriorityMessage p1, PriorityMessage p2) {
777 if (p2.priority != p1.priority) {
778 return p2.priority - p1.priority;
780 return (p2.seqNum < p1.seqNum) ? 1 : -1;
784 this.transmitThread = new Thread(new PriorityMessageTransmit());
785 this.transmitThread.start();
789 * Setup communication services
791 private void setupCommChannel() throws Exception {
792 this.selector = SelectorProvider.provider().openSelector();
793 this.socket.configureBlocking(false);
794 this.socket.socket().setTcpNoDelay(true);
795 this.msgReadWriteService = getMessageReadWriteService();
798 private void sendFirstHello() {
800 OFMessage msg = factory.getMessage(OFType.HELLO);
802 } catch (Exception e) {
807 private IMessageReadWrite getMessageReadWriteService() throws Exception {
808 String str = System.getProperty("secureChannelEnabled");
809 return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
810 selector) : new MessageReadWriteService(socket, selector);
814 * Send Barrier message synchronously. The caller will be blocked until the
815 * Barrier reply is received.
818 public Object syncSendBarrierMessage() {
819 OFBarrierRequest barrierMsg = new OFBarrierRequest();
820 return syncSend(barrierMsg);
824 * Send Barrier message asynchronously. The caller is not blocked. The
825 * Barrier message will be sent in a transmit thread which will be blocked
826 * until the Barrier reply is received.
829 public Object asyncSendBarrierMessage() {
830 if (transmitQ == null) {
831 return Boolean.FALSE;
834 OFBarrierRequest barrierMsg = new OFBarrierRequest();
835 int xid = getNextXid();
837 barrierMsg.setXid(xid);
838 transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
844 * This method returns the switch liveness timeout value. If controller did
845 * not receive any message from the switch for such a long period,
846 * controller will tear down the connection to the switch.
848 * @return The timeout value
850 private static int getSwitchLivenessTimeout() {
851 String timeout = System.getProperty("of.switchLivenessTimeout");
855 if (timeout != null) {
856 rv = Integer.parseInt(timeout);
858 } catch (Exception e) {
865 * This method performs synchronous operations for a given message. If
866 * syncRequest is set to true, the message will be sent out followed by a
867 * Barrier request message. Then it's blocked until the Barrier rely arrives
868 * or timeout. If syncRequest is false, it simply skips the message send and
869 * just waits for the response back.
876 * If set to true, the message the message will be sent out
877 * followed by a Barrier request message. If set to false, it
878 * simply skips the sending and just waits for the Barrier reply.
881 private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
882 SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
883 messageWaitingDone.put(xid, worker);
884 Object result = null;
885 Boolean status = false;
886 Future<Object> submit;
888 submit = executor.submit(worker);
889 } catch (RejectedExecutionException re) {
890 messageWaitingDone.remove(xid);
894 result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
895 messageWaitingDone.remove(xid);
896 if (result == null) {
897 // if result is null, then it means the switch can handle this
898 // message successfully
899 // convert the result into a Boolean with value true
901 // logger.debug("Successfully send " +
902 // msg.getType().toString());
905 // if result is not null, this means the switch can't handle
907 // the result if OFError already
908 if (logger.isDebugEnabled()) {
909 logger.debug("Send {} failed --> {}", msg.getType(), (result));
913 } catch (Exception e) {
914 logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
915 // convert the result into a Boolean with value false
924 public void deleteAllFlows() {
925 logger.trace("deleteAllFlows on switch {}", HexString.toHexString(this.sid));
926 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
927 OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
928 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
929 .setLength((short) OFFlowMod.MINIMUM_LENGTH);
930 asyncFastSend(flowMod);