4 package org.openflow.example;
6 import java.io.IOException;
7 import java.net.InetAddress;
8 import java.nio.channels.SelectionKey;
9 import java.nio.channels.ServerSocketChannel;
10 import java.nio.channels.SocketChannel;
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.List;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
19 import org.openflow.example.cli.Options;
20 import org.openflow.example.cli.ParseException;
21 import org.openflow.example.cli.SimpleCLI;
22 import org.openflow.io.OFMessageAsyncStream;
23 import org.openflow.protocol.OFEchoReply;
24 import org.openflow.protocol.OFFlowMod;
25 import org.openflow.protocol.OFMatch;
26 import org.openflow.protocol.OFMessage;
27 import org.openflow.protocol.OFPacketIn;
28 import org.openflow.protocol.OFPacketOut;
29 import org.openflow.protocol.OFPort;
30 import org.openflow.protocol.OFType;
31 import org.openflow.protocol.action.OFAction;
32 import org.openflow.protocol.action.OFActionOutput;
33 import org.openflow.protocol.factory.BasicFactory;
34 import org.openflow.util.LRULinkedHashMap;
35 import org.openflow.util.U16;
38 * @author Rob Sherwood (rob.sherwood@stanford.edu), David Erickson (daviderickson@cs.stanford.edu)
41 public class SimpleController implements SelectListener {
42 protected ExecutorService es;
43 protected BasicFactory factory;
44 protected SelectLoop listenSelectLoop;
45 protected ServerSocketChannel listenSock;
46 protected List<SelectLoop> switchSelectLoops;
47 protected Map<SocketChannel,OFSwitch> switchSockets;
48 protected Integer threadCount;
51 protected class OFSwitch {
52 protected SocketChannel sock;
53 protected OFMessageAsyncStream stream;
54 protected Map<Integer, Short> macTable =
55 new LRULinkedHashMap<Integer, Short>(64001, 64000);
57 public OFSwitch(SocketChannel sock, OFMessageAsyncStream stream) {
62 public void handlePacketIn(OFPacketIn pi) {
64 OFMatch match = new OFMatch();
65 match.loadFromPacket(pi.getPacketData(), pi.getInPort());
66 byte[] dlDst = match.getDataLayerDestination();
67 Integer dlDstKey = Arrays.hashCode(dlDst);
68 byte[] dlSrc = match.getDataLayerSource();
69 Integer dlSrcKey = Arrays.hashCode(dlSrc);
70 int bufferId = pi.getBufferId();
72 // if the src is not multicast, learn it
73 if ((dlSrc[0] & 0x1) == 0) {
74 if (!macTable.containsKey(dlSrcKey) ||
75 !macTable.get(dlSrcKey).equals(pi.getInPort())) {
76 macTable.put(dlSrcKey, pi.getInPort());
81 // if the destination is not multicast, look it up
82 if ((dlDst[0] & 0x1) == 0) {
83 outPort = macTable.get(dlDstKey);
86 // push a flow mod if we know where the packet should be going
87 if (outPort != null) {
88 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
89 fm.setBufferId(bufferId);
90 fm.setCommand((short) 0);
92 fm.setFlags((short) 0);
93 fm.setHardTimeout((short) 0);
94 fm.setIdleTimeout((short) 5);
95 match.setInputPort(pi.getInPort());
96 match.setWildcards(0);
98 fm.setOutPort((short) OFPort.OFPP_NONE.getValue());
99 fm.setPriority((short) 0);
100 OFActionOutput action = new OFActionOutput();
101 action.setMaxLength((short) 0);
102 action.setPort(outPort);
103 List<OFAction> actions = new ArrayList<OFAction>();
105 fm.setActions(actions);
106 fm.setLength(U16.t(OFFlowMod.MINIMUM_LENGTH+OFActionOutput.MINIMUM_LENGTH));
109 } catch (IOException e) {
115 if (outPort == null || pi.getBufferId() == 0xffffffff) {
116 OFPacketOut po = new OFPacketOut();
117 po.setBufferId(bufferId);
118 po.setInPort(pi.getInPort());
121 OFActionOutput action = new OFActionOutput();
122 action.setMaxLength((short) 0);
123 action.setPort((short) ((outPort == null) ? OFPort.OFPP_FLOOD
124 .getValue() : outPort));
125 List<OFAction> actions = new ArrayList<OFAction>();
127 po.setActions(actions);
128 po.setActionsLength((short) OFActionOutput.MINIMUM_LENGTH);
130 // set data if needed
131 if (bufferId == 0xffffffff) {
132 byte[] packetData = pi.getPacketData();
133 po.setLength(U16.t(OFPacketOut.MINIMUM_LENGTH
134 + po.getActionsLength() + packetData.length));
135 po.setPacketData(packetData);
137 po.setLength(U16.t(OFPacketOut.MINIMUM_LENGTH
138 + po.getActionsLength()));
142 } catch (IOException e) {
148 public String toString() {
149 InetAddress remote = sock.socket().getInetAddress();
150 return remote.getHostAddress() + ":" + sock.socket().getPort();
153 public OFMessageAsyncStream getStream() {
158 public SimpleController(int port) throws IOException{
159 listenSock = ServerSocketChannel.open();
160 listenSock.configureBlocking(false);
161 listenSock.socket().bind(new java.net.InetSocketAddress(port));
162 listenSock.socket().setReuseAddress(true);
164 switchSelectLoops = new ArrayList<SelectLoop>();
165 switchSockets = new ConcurrentHashMap<SocketChannel,OFSwitch>();
167 listenSelectLoop = new SelectLoop(this);
168 // register this connection for accepting
169 listenSelectLoop.register(listenSock, SelectionKey.OP_ACCEPT, listenSock);
171 this.factory = new BasicFactory();
175 public void handleEvent(SelectionKey key, Object arg) throws IOException {
176 if (arg instanceof ServerSocketChannel)
177 handleListenEvent(key, (ServerSocketChannel)arg);
179 handleSwitchEvent(key, (SocketChannel) arg);
182 protected void handleListenEvent(SelectionKey key, ServerSocketChannel ssc)
184 SocketChannel sock = listenSock.accept();
185 OFMessageAsyncStream stream = new OFMessageAsyncStream(sock, factory);
186 switchSockets.put(sock, new OFSwitch(sock, stream));
188 .println("Got new connection from " + switchSockets.get(sock));
189 List<OFMessage> l = new ArrayList<OFMessage>();
190 l.add(factory.getMessage(OFType.HELLO));
191 l.add(factory.getMessage(OFType.FEATURES_REQUEST));
194 int ops = SelectionKey.OP_READ;
195 if (stream.needsFlush())
196 ops |= SelectionKey.OP_WRITE;
198 // hash this switch into a thread
199 SelectLoop sl = switchSelectLoops.get(sock.hashCode()
200 % switchSelectLoops.size());
201 sl.register(sock, ops, sock);
202 // force select to return and re-enter using the new set of keys
206 protected void handleSwitchEvent(SelectionKey key, SocketChannel sock) {
207 OFSwitch sw = switchSockets.get(sock);
208 OFMessageAsyncStream stream = sw.getStream();
210 if (key.isReadable()) {
211 List<OFMessage> msgs = stream.read();
214 switchSockets.remove(sock);
218 for (OFMessage m : msgs) {
219 switch (m.getType()) {
221 sw.handlePacketIn((OFPacketIn) m);
224 System.err.println("GOT HELLO from " + sw);
227 OFEchoReply reply = (OFEchoReply) stream
228 .getMessageFactory().getMessage(
230 reply.setXid(m.getXid());
234 System.err.println("Unhandled OF message: "
235 + m.getType() + " from "
236 + sock.socket().getInetAddress());
240 if (key.isWritable()) {
245 * Only register for interest in R OR W, not both, causes stream
246 * deadlock after some period of time
248 if (stream.needsFlush())
249 key.interestOps(SelectionKey.OP_WRITE);
251 key.interestOps(SelectionKey.OP_READ);
252 } catch (IOException e) {
253 // if we have an exception, disconnect the switch
255 switchSockets.remove(sock);
259 public void run() throws IOException{
260 System.err.println("Starting " + this.getClass().getCanonicalName() +
261 " on port " + this.port + " with " + this.threadCount + " threads");
262 // Static number of threads equal to processor cores
263 es = Executors.newFixedThreadPool(threadCount);
265 // Launch one select loop per threadCount and start running
266 for (int i = 0; i < threadCount; ++i) {
267 final SelectLoop sl = new SelectLoop(this);
268 switchSelectLoops.add(sl);
269 es.execute(new Runnable() {
274 } catch (IOException e) {
281 // Start the listen loop
282 listenSelectLoop.doLoop();
285 public static void main(String [] args) throws IOException {
286 SimpleCLI cmd = parseArgs(args);
287 int port = Integer.valueOf(cmd.getOptionValue("p"));
288 SimpleController sc = new SimpleController(port);
289 sc.threadCount = Integer.valueOf(cmd.getOptionValue("t"));
293 public static SimpleCLI parseArgs(String[] args) {
294 Options options = new Options();
295 options.addOption("h", "help", "print help");
297 // options.addOption("n", true, "the number of packets to send");
298 options.addOption("p", "port", 6633, "the port to listen on");
299 options.addOption("t", "threads", 1, "the number of threads to run");
301 SimpleCLI cmd = SimpleCLI.parse(options, args);
302 if (cmd.hasOption("h")) {
307 } catch (ParseException e) {
308 System.err.println(e);
316 public static void printUsage(Options options) {
317 SimpleCLI.printHelp("Usage: "
318 + SimpleController.class.getCanonicalName() + " [options]",