4 package org.openflow.codec.example;
6 import java.io.IOException;
7 import java.net.InetAddress;
8 import java.nio.channels.SelectionKey;
9 import java.nio.channels.ServerSocketChannel;
10 import java.nio.channels.SocketChannel;
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.List;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
19 import org.openflow.codec.example.cli.Options;
20 import org.openflow.codec.example.cli.ParseException;
21 import org.openflow.codec.example.cli.SimpleCLI;
22 import org.openflow.codec.io.OFMessageAsyncStream;
23 import org.openflow.codec.protocol.OFPEchoReply;
24 import org.openflow.codec.protocol.OFPFlowMod;
25 import org.openflow.codec.protocol.OFPFlowModCommand;
26 import org.openflow.codec.protocol.OFPMatch;
27 import org.openflow.codec.protocol.OFPMessage;
28 import org.openflow.codec.protocol.OFPPacketIn;
29 import org.openflow.codec.protocol.OFPPacketOut;
30 import org.openflow.codec.protocol.OFPPortNo;
31 import org.openflow.codec.protocol.OFPType;
32 import org.openflow.codec.protocol.action.OFPAction;
33 import org.openflow.codec.protocol.action.OFPActionOutput;
34 import org.openflow.codec.protocol.factory.OFPBasicFactoryImpl;
35 import org.openflow.codec.protocol.instruction.OFPInstruction;
36 import org.openflow.codec.protocol.instruction.OFPInstructionActions;
37 import org.openflow.codec.protocol.instruction.OFPInstructionApplyActions;
38 import org.openflow.codec.util.LRULinkedHashMap;
39 import org.openflow.codec.util.U16;
42 * @author Rob Sherwood (rob.sherwood@stanford.edu), David Erickson
43 * (daviderickson@cs.stanford.edu)
46 public class SimpleController implements SelectListener {
47 protected ExecutorService es;
48 protected OFPBasicFactoryImpl factory;
49 protected SelectLoop listenSelectLoop;
50 protected ServerSocketChannel listenSock;
51 protected List<SelectLoop> switchSelectLoops;
52 protected Map<SocketChannel, OFSwitch> switchSockets;
53 protected Integer threadCount;
56 protected class OFSwitch {
57 protected SocketChannel sock;
58 protected OFMessageAsyncStream stream;
59 protected Map<Integer, Short> macTable = new LRULinkedHashMap<Integer, Short>(64001, 64000);
61 public OFSwitch(SocketChannel sock, OFMessageAsyncStream stream) {
66 public void handlePacketIn(OFPPacketIn pi) {
68 OFPMatch match = new OFPMatch();
69 // match.loadFromPacket(pi.getPacketData(), pi.getInPort());
70 // byte[] dlDst = match.getDataLayerDestination();
71 // Integer dlDstKey = Arrays.hashCode(dlDst);
72 // byte[] dlSrc = match.getDataLayerSource();
73 // Integer dlSrcKey = Arrays.hashCode(dlSrc);
74 int bufferId = pi.getBufferId();
76 // if the src is not multicast, learn it
77 // if ((dlSrc[0] & 0x1) == 0) {
78 // if (!macTable.containsKey(dlSrcKey) ||
79 // !macTable.get(dlSrcKey).equals(pi.getInPort())) {
80 // macTable.put(dlSrcKey, pi.getInPort());
85 // // if the destination is not multicast, look it up
86 // if ((dlDst[0] & 0x1) == 0) {
87 // outPort = macTable.get(dlDstKey);
90 // push a flow mod if we know where the packet should be going
91 if (outPort != null) {
92 OFPFlowMod fm = (OFPFlowMod) factory.getMessage(OFPType.FLOW_MOD);
93 fm.setBufferId(bufferId);
94 fm.setCommand(OFPFlowModCommand.OFPFC_ADD);
96 fm.setFlags((short) 0);
97 fm.setHardTimeout((short) 0);
98 fm.setIdleTimeout((short) 5);
99 // match.setInputPort(pi.getInPort());
100 // match.setWildcards(0);
102 fm.setOutPort(OFPPortNo.OFPP_ANY.getValue());
103 fm.setPriority((short) 0);
104 OFPActionOutput action = new OFPActionOutput();
105 action.setMaxLength((short) 0);
106 action.setPort(outPort);
107 List<OFPAction> actions = new ArrayList<OFPAction>();
109 OFPInstructionApplyActions instructions = new OFPInstructionApplyActions();
110 instructions.setActions(actions);
111 List<OFPInstruction> instrList = new ArrayList<OFPInstruction>();
112 instrList.add(instructions);
113 fm.setInstructions(instrList);
114 fm.setLength(U16.t(OFPFlowMod.MINIMUM_LENGTH + instructions.getLength()));
117 } catch (IOException e) {
123 if (outPort == null || pi.getBufferId() == 0xffffffff) {
124 OFPPacketOut po = new OFPPacketOut();
125 po.setBufferId(bufferId);
126 // po.setInPort(pi.getInPort());
129 OFPActionOutput action = new OFPActionOutput();
130 action.setMaxLength((short) 0);
131 action.setPort((short) ((outPort == null) ? OFPPortNo.OFPP_FLOOD.getValue() : outPort));
132 List<OFPAction> actions = new ArrayList<OFPAction>();
134 po.setActions(actions);
135 po.setActionsLength((short) OFPActionOutput.MINIMUM_LENGTH);
137 // set data if needed
138 if (bufferId == 0xffffffff) {
139 byte[] packetData = pi.getPacketData();
140 po.setLength(U16.t(OFPPacketOut.MINIMUM_LENGTH + po.getActionsLength() + packetData.length));
141 po.setPacketData(packetData);
143 po.setLength(U16.t(OFPPacketOut.MINIMUM_LENGTH + po.getActionsLength()));
147 } catch (IOException e) {
153 public String toString() {
154 InetAddress remote = sock.socket().getInetAddress();
155 return remote.getHostAddress() + ":" + sock.socket().getPort();
158 public OFMessageAsyncStream getStream() {
163 public SimpleController(int port) throws IOException {
164 listenSock = ServerSocketChannel.open();
165 listenSock.configureBlocking(false);
166 listenSock.socket().bind(new java.net.InetSocketAddress(port));
167 listenSock.socket().setReuseAddress(true);
169 switchSelectLoops = new ArrayList<SelectLoop>();
170 switchSockets = new ConcurrentHashMap<SocketChannel, OFSwitch>();
172 listenSelectLoop = new SelectLoop(this);
173 // register this connection for accepting
174 listenSelectLoop.register(listenSock, SelectionKey.OP_ACCEPT, listenSock);
176 this.factory = new OFPBasicFactoryImpl();
180 public void handleEvent(SelectionKey key, Object arg) throws IOException {
181 if (arg instanceof ServerSocketChannel)
182 handleListenEvent(key, (ServerSocketChannel) arg);
184 handleSwitchEvent(key, (SocketChannel) arg);
187 protected void handleListenEvent(SelectionKey key, ServerSocketChannel ssc) throws IOException {
188 SocketChannel sock = listenSock.accept();
189 OFMessageAsyncStream stream = new OFMessageAsyncStream(sock, factory);
190 switchSockets.put(sock, new OFSwitch(sock, stream));
191 System.err.println("Got new connection from " + switchSockets.get(sock));
192 List<OFPMessage> l = new ArrayList<OFPMessage>();
193 l.add(factory.getMessage(OFPType.HELLO));
194 l.add(factory.getMessage(OFPType.FEATURES_REQUEST));
197 int ops = SelectionKey.OP_READ;
198 if (stream.needsFlush())
199 ops |= SelectionKey.OP_WRITE;
201 // hash this switch into a thread
202 SelectLoop sl = switchSelectLoops.get(sock.hashCode() % switchSelectLoops.size());
203 sl.register(sock, ops, sock);
204 // force select to return and re-enter using the new set of keys
208 protected void handleSwitchEvent(SelectionKey key, SocketChannel sock) {
209 OFSwitch sw = switchSockets.get(sock);
210 OFMessageAsyncStream stream = sw.getStream();
212 if (key.isReadable()) {
213 List<OFPMessage> msgs = stream.read();
216 switchSockets.remove(sock);
220 for (OFPMessage m : msgs) {
221 switch (m.getType()) {
223 sw.handlePacketIn((OFPPacketIn) m);
226 System.err.println("GOT HELLO from " + sw);
229 OFPEchoReply reply = (OFPEchoReply) stream.getMessageFactory().getMessage(OFPType.ECHO_REPLY);
230 reply.setXid(m.getXid());
234 System.err.println("Unhandled OF message: " + m.getType() + " from "
235 + sock.socket().getInetAddress());
239 if (key.isWritable()) {
244 * Only register for interest in R OR W, not both, causes stream
245 * deadlock after some period of time
247 if (stream.needsFlush())
248 key.interestOps(SelectionKey.OP_WRITE);
250 key.interestOps(SelectionKey.OP_READ);
251 } catch (IOException e) {
252 // if we have an exception, disconnect the switch
254 switchSockets.remove(sock);
258 public void run() throws IOException {
259 System.err.println("Starting " + this.getClass().getCanonicalName() + " on port " + this.port + " with "
260 + this.threadCount + " threads");
261 // Static number of threads equal to processor cores
262 es = Executors.newFixedThreadPool(threadCount);
264 // Launch one select loop per threadCount and start running
265 for (int i = 0; i < threadCount; ++i) {
266 final SelectLoop sl = new SelectLoop(this);
267 switchSelectLoops.add(sl);
268 es.execute(new Runnable() {
273 } catch (IOException e) {
280 // Start the listen loop
281 listenSelectLoop.doLoop();
284 public static void main(String[] args) throws IOException {
285 SimpleCLI cmd = parseArgs(args);
286 int port = Integer.valueOf(cmd.getOptionValue("p"));
287 SimpleController sc = new SimpleController(port);
288 sc.threadCount = Integer.valueOf(cmd.getOptionValue("t"));
292 public static SimpleCLI parseArgs(String[] args) {
293 Options options = new Options();
294 options.addOption("h", "help", "print help");
296 // options.addOption("n", true, "the number of packets to send");
297 options.addOption("p", "port", 6633, "the port to listen on");
298 options.addOption("t", "threads", 1, "the number of threads to run");
300 SimpleCLI cmd = SimpleCLI.parse(options, args);
301 if (cmd.hasOption("h")) {
306 } catch (ParseException e) {
307 System.err.println(e);
315 public static void printUsage(Options options) {
316 SimpleCLI.printHelp("Usage: " + SimpleController.class.getCanonicalName() + " [options]", options);