01420d124bc860c104548e44889db873174a82d4
[openflowjava.git] / third-party / openflow-codec / src / main / java / org / openflow / codec / example / SimpleController.java
1 /**
2  *
3  */
4 package org.openflow.codec.example;
5
6 import java.io.IOException;
7 import java.net.InetAddress;
8 import java.nio.channels.SelectionKey;
9 import java.nio.channels.ServerSocketChannel;
10 import java.nio.channels.SocketChannel;
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18
19 import org.openflow.codec.example.cli.Options;
20 import org.openflow.codec.example.cli.ParseException;
21 import org.openflow.codec.example.cli.SimpleCLI;
22 import org.openflow.codec.io.OFMessageAsyncStream;
23 import org.openflow.codec.protocol.OFPEchoReply;
24 import org.openflow.codec.protocol.OFPFlowMod;
25 import org.openflow.codec.protocol.OFPFlowModCommand;
26 import org.openflow.codec.protocol.OFPMatch;
27 import org.openflow.codec.protocol.OFPMessage;
28 import org.openflow.codec.protocol.OFPPacketIn;
29 import org.openflow.codec.protocol.OFPPacketOut;
30 import org.openflow.codec.protocol.OFPPortNo;
31 import org.openflow.codec.protocol.OFPType;
32 import org.openflow.codec.protocol.action.OFPAction;
33 import org.openflow.codec.protocol.action.OFPActionOutput;
34 import org.openflow.codec.protocol.factory.OFPBasicFactoryImpl;
35 import org.openflow.codec.protocol.instruction.OFPInstruction;
36 import org.openflow.codec.protocol.instruction.OFPInstructionActions;
37 import org.openflow.codec.protocol.instruction.OFPInstructionApplyActions;
38 import org.openflow.codec.util.LRULinkedHashMap;
39 import org.openflow.codec.util.U16;
40
41 /**
42  * @author Rob Sherwood (rob.sherwood@stanford.edu), David Erickson
43  *         (daviderickson@cs.stanford.edu)
44  *
45  */
46 public class SimpleController implements SelectListener {
47     protected ExecutorService es;
48     protected OFPBasicFactoryImpl factory;
49     protected SelectLoop listenSelectLoop;
50     protected ServerSocketChannel listenSock;
51     protected List<SelectLoop> switchSelectLoops;
52     protected Map<SocketChannel, OFSwitch> switchSockets;
53     protected Integer threadCount;
54     protected int port;
55
56     protected class OFSwitch {
57         protected SocketChannel sock;
58         protected OFMessageAsyncStream stream;
59         protected Map<Integer, Short> macTable = new LRULinkedHashMap<Integer, Short>(64001, 64000);
60
61         public OFSwitch(SocketChannel sock, OFMessageAsyncStream stream) {
62             this.sock = sock;
63             this.stream = stream;
64         }
65
66         public void handlePacketIn(OFPPacketIn pi) {
67             // Build the Match
68             OFPMatch match = new OFPMatch();
69             // match.loadFromPacket(pi.getPacketData(), pi.getInPort());
70             // byte[] dlDst = match.getDataLayerDestination();
71             // Integer dlDstKey = Arrays.hashCode(dlDst);
72             // byte[] dlSrc = match.getDataLayerSource();
73             // Integer dlSrcKey = Arrays.hashCode(dlSrc);
74             int bufferId = pi.getBufferId();
75
76             // if the src is not multicast, learn it
77             // if ((dlSrc[0] & 0x1) == 0) {
78             // if (!macTable.containsKey(dlSrcKey) ||
79             // !macTable.get(dlSrcKey).equals(pi.getInPort())) {
80             // macTable.put(dlSrcKey, pi.getInPort());
81             // }
82             // }
83             //
84             Short outPort = null;
85             // // if the destination is not multicast, look it up
86             // if ((dlDst[0] & 0x1) == 0) {
87             // outPort = macTable.get(dlDstKey);
88             // }
89
90             // push a flow mod if we know where the packet should be going
91             if (outPort != null) {
92                 OFPFlowMod fm = (OFPFlowMod) factory.getMessage(OFPType.FLOW_MOD);
93                 fm.setBufferId(bufferId);
94                 fm.setCommand(OFPFlowModCommand.OFPFC_ADD);
95                 fm.setCookie(0);
96                 fm.setFlags((short) 0);
97                 fm.setHardTimeout((short) 0);
98                 fm.setIdleTimeout((short) 5);
99                 // match.setInputPort(pi.getInPort());
100                 // match.setWildcards(0);
101                 fm.setMatch(match);
102                 fm.setOutPort(OFPPortNo.OFPP_ANY.getValue());
103                 fm.setPriority((short) 0);
104                 OFPActionOutput action = new OFPActionOutput();
105                 action.setMaxLength((short) 0);
106                 action.setPort(outPort);
107                 List<OFPAction> actions = new ArrayList<OFPAction>();
108                 actions.add(action);
109                 OFPInstructionApplyActions instructions = new OFPInstructionApplyActions();
110                 instructions.setActions(actions);
111                 List<OFPInstruction> instrList = new ArrayList<OFPInstruction>();
112                 instrList.add(instructions);
113                 fm.setInstructions(instrList);
114                 fm.setLength(U16.t(OFPFlowMod.MINIMUM_LENGTH + instructions.getLength()));
115                 try {
116                     stream.write(fm);
117                 } catch (IOException e) {
118                     e.printStackTrace();
119                 }
120             }
121
122             // Send a packet out
123             if (outPort == null || pi.getBufferId() == 0xffffffff) {
124                 OFPPacketOut po = new OFPPacketOut();
125                 po.setBufferId(bufferId);
126                 // po.setInPort(pi.getInPort());
127
128                 // set actions
129                 OFPActionOutput action = new OFPActionOutput();
130                 action.setMaxLength((short) 0);
131                 action.setPort((short) ((outPort == null) ? OFPPortNo.OFPP_FLOOD.getValue() : outPort));
132                 List<OFPAction> actions = new ArrayList<OFPAction>();
133                 actions.add(action);
134                 po.setActions(actions);
135                 po.setActionsLength((short) OFPActionOutput.MINIMUM_LENGTH);
136
137                 // set data if needed
138                 if (bufferId == 0xffffffff) {
139                     byte[] packetData = pi.getPacketData();
140                     po.setLength(U16.t(OFPPacketOut.MINIMUM_LENGTH + po.getActionsLength() + packetData.length));
141                     po.setPacketData(packetData);
142                 } else {
143                     po.setLength(U16.t(OFPPacketOut.MINIMUM_LENGTH + po.getActionsLength()));
144                 }
145                 try {
146                     stream.write(po);
147                 } catch (IOException e) {
148                     e.printStackTrace();
149                 }
150             }
151         }
152
153         public String toString() {
154             InetAddress remote = sock.socket().getInetAddress();
155             return remote.getHostAddress() + ":" + sock.socket().getPort();
156         }
157
158         public OFMessageAsyncStream getStream() {
159             return stream;
160         }
161     }
162
163     public SimpleController(int port) throws IOException {
164         listenSock = ServerSocketChannel.open();
165         listenSock.configureBlocking(false);
166         listenSock.socket().bind(new java.net.InetSocketAddress(port));
167         listenSock.socket().setReuseAddress(true);
168         this.port = port;
169         switchSelectLoops = new ArrayList<SelectLoop>();
170         switchSockets = new ConcurrentHashMap<SocketChannel, OFSwitch>();
171         threadCount = 1;
172         listenSelectLoop = new SelectLoop(this);
173         // register this connection for accepting
174         listenSelectLoop.register(listenSock, SelectionKey.OP_ACCEPT, listenSock);
175
176         this.factory = new OFPBasicFactoryImpl();
177     }
178
179     @Override
180     public void handleEvent(SelectionKey key, Object arg) throws IOException {
181         if (arg instanceof ServerSocketChannel)
182             handleListenEvent(key, (ServerSocketChannel) arg);
183         else
184             handleSwitchEvent(key, (SocketChannel) arg);
185     }
186
187     protected void handleListenEvent(SelectionKey key, ServerSocketChannel ssc) throws IOException {
188         SocketChannel sock = listenSock.accept();
189         OFMessageAsyncStream stream = new OFMessageAsyncStream(sock, factory);
190         switchSockets.put(sock, new OFSwitch(sock, stream));
191         System.err.println("Got new connection from " + switchSockets.get(sock));
192         List<OFPMessage> l = new ArrayList<OFPMessage>();
193         l.add(factory.getMessage(OFPType.HELLO));
194         l.add(factory.getMessage(OFPType.FEATURES_REQUEST));
195         stream.write(l);
196
197         int ops = SelectionKey.OP_READ;
198         if (stream.needsFlush())
199             ops |= SelectionKey.OP_WRITE;
200
201         // hash this switch into a thread
202         SelectLoop sl = switchSelectLoops.get(sock.hashCode() % switchSelectLoops.size());
203         sl.register(sock, ops, sock);
204         // force select to return and re-enter using the new set of keys
205         sl.wakeup();
206     }
207
208     protected void handleSwitchEvent(SelectionKey key, SocketChannel sock) {
209         OFSwitch sw = switchSockets.get(sock);
210         OFMessageAsyncStream stream = sw.getStream();
211         try {
212             if (key.isReadable()) {
213                 List<OFPMessage> msgs = stream.read();
214                 if (msgs == null) {
215                     key.cancel();
216                     switchSockets.remove(sock);
217                     return;
218                 }
219
220                 for (OFPMessage m : msgs) {
221                     switch (m.getType()) {
222                     case PACKET_IN:
223                         sw.handlePacketIn((OFPPacketIn) m);
224                         break;
225                     case HELLO:
226                         System.err.println("GOT HELLO from " + sw);
227                         break;
228                     case ECHO_REQUEST:
229                         OFPEchoReply reply = (OFPEchoReply) stream.getMessageFactory().getMessage(OFPType.ECHO_REPLY);
230                         reply.setXid(m.getXid());
231                         stream.write(reply);
232                         break;
233                     default:
234                         System.err.println("Unhandled OF message: " + m.getType() + " from "
235                                 + sock.socket().getInetAddress());
236                     }
237                 }
238             }
239             if (key.isWritable()) {
240                 stream.flush();
241             }
242
243             /**
244              * Only register for interest in R OR W, not both, causes stream
245              * deadlock after some period of time
246              */
247             if (stream.needsFlush())
248                 key.interestOps(SelectionKey.OP_WRITE);
249             else
250                 key.interestOps(SelectionKey.OP_READ);
251         } catch (IOException e) {
252             // if we have an exception, disconnect the switch
253             key.cancel();
254             switchSockets.remove(sock);
255         }
256     }
257
258     public void run() throws IOException {
259         System.err.println("Starting " + this.getClass().getCanonicalName() + " on port " + this.port + " with "
260                 + this.threadCount + " threads");
261         // Static number of threads equal to processor cores
262         es = Executors.newFixedThreadPool(threadCount);
263
264         // Launch one select loop per threadCount and start running
265         for (int i = 0; i < threadCount; ++i) {
266             final SelectLoop sl = new SelectLoop(this);
267             switchSelectLoops.add(sl);
268             es.execute(new Runnable() {
269                 @Override
270                 public void run() {
271                     try {
272                         sl.doLoop();
273                     } catch (IOException e) {
274                         e.printStackTrace();
275                     }
276                 }
277             });
278         }
279
280         // Start the listen loop
281         listenSelectLoop.doLoop();
282     }
283
284     public static void main(String[] args) throws IOException {
285         SimpleCLI cmd = parseArgs(args);
286         int port = Integer.valueOf(cmd.getOptionValue("p"));
287         SimpleController sc = new SimpleController(port);
288         sc.threadCount = Integer.valueOf(cmd.getOptionValue("t"));
289         sc.run();
290     }
291
292     public static SimpleCLI parseArgs(String[] args) {
293         Options options = new Options();
294         options.addOption("h", "help", "print help");
295         // unused?
296         // options.addOption("n", true, "the number of packets to send");
297         options.addOption("p", "port", 6633, "the port to listen on");
298         options.addOption("t", "threads", 1, "the number of threads to run");
299         try {
300             SimpleCLI cmd = SimpleCLI.parse(options, args);
301             if (cmd.hasOption("h")) {
302                 printUsage(options);
303                 System.exit(0);
304             }
305             return cmd;
306         } catch (ParseException e) {
307             System.err.println(e);
308             printUsage(options);
309         }
310
311         System.exit(-1);
312         return null;
313     }
314
315     public static void printUsage(Options options) {
316         SimpleCLI.printHelp("Usage: " + SimpleController.class.getCanonicalName() + " [options]", options);
317     }
318 }