Initial opendaylight infrastructure commit!!
[controller.git] / third-party / openflowj / src / main / java / org / openflow / example / SimpleController.java
1 /**
2  *
3  */
4 package org.openflow.example;
5
6 import java.io.IOException;
7 import java.net.InetAddress;
8 import java.nio.channels.SelectionKey;
9 import java.nio.channels.ServerSocketChannel;
10 import java.nio.channels.SocketChannel;
11 import java.util.ArrayList;
12 import java.util.Arrays;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18
19 import org.openflow.example.cli.Options;
20 import org.openflow.example.cli.ParseException;
21 import org.openflow.example.cli.SimpleCLI;
22 import org.openflow.io.OFMessageAsyncStream;
23 import org.openflow.protocol.OFEchoReply;
24 import org.openflow.protocol.OFFlowMod;
25 import org.openflow.protocol.OFMatch;
26 import org.openflow.protocol.OFMessage;
27 import org.openflow.protocol.OFPacketIn;
28 import org.openflow.protocol.OFPacketOut;
29 import org.openflow.protocol.OFPort;
30 import org.openflow.protocol.OFType;
31 import org.openflow.protocol.action.OFAction;
32 import org.openflow.protocol.action.OFActionOutput;
33 import org.openflow.protocol.factory.BasicFactory;
34 import org.openflow.util.LRULinkedHashMap;
35 import org.openflow.util.U16;
36
37 /**
38  * @author Rob Sherwood (rob.sherwood@stanford.edu), David Erickson (daviderickson@cs.stanford.edu)
39  *
40  */
41 public class SimpleController implements SelectListener {
42     protected ExecutorService es;
43     protected BasicFactory factory;
44     protected SelectLoop listenSelectLoop;
45     protected ServerSocketChannel listenSock;
46     protected List<SelectLoop> switchSelectLoops;
47     protected Map<SocketChannel,OFSwitch> switchSockets;
48     protected Integer threadCount;
49     protected int port;
50
51     protected class OFSwitch {
52         protected SocketChannel sock;
53         protected OFMessageAsyncStream stream;
54         protected Map<Integer, Short> macTable =
55             new LRULinkedHashMap<Integer, Short>(64001, 64000);
56
57         public OFSwitch(SocketChannel sock, OFMessageAsyncStream stream) {
58             this.sock = sock;
59             this.stream = stream;
60         }
61
62         public void handlePacketIn(OFPacketIn pi) {
63             // Build the Match
64             OFMatch match = new OFMatch();
65             match.loadFromPacket(pi.getPacketData(), pi.getInPort());
66             byte[] dlDst = match.getDataLayerDestination();
67             Integer dlDstKey = Arrays.hashCode(dlDst);
68             byte[] dlSrc = match.getDataLayerSource();
69             Integer dlSrcKey = Arrays.hashCode(dlSrc);
70             int bufferId = pi.getBufferId();
71
72             // if the src is not multicast, learn it
73             if ((dlSrc[0] & 0x1) == 0) {
74                 if (!macTable.containsKey(dlSrcKey) ||
75                         !macTable.get(dlSrcKey).equals(pi.getInPort())) {
76                     macTable.put(dlSrcKey, pi.getInPort());
77                 }
78             }
79
80             Short outPort = null;
81             // if the destination is not multicast, look it up
82             if ((dlDst[0] & 0x1) == 0) {
83                 outPort = macTable.get(dlDstKey);
84             }
85
86             // push a flow mod if we know where the packet should be going
87             if (outPort != null) {
88                 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
89                 fm.setBufferId(bufferId);
90                 fm.setCommand((short) 0);
91                 fm.setCookie(0);
92                 fm.setFlags((short) 0);
93                 fm.setHardTimeout((short) 0);
94                 fm.setIdleTimeout((short) 5);
95                 match.setInputPort(pi.getInPort());
96                 match.setWildcards(0);
97                 fm.setMatch(match);
98                 fm.setOutPort((short) OFPort.OFPP_NONE.getValue());
99                 fm.setPriority((short) 0);
100                 OFActionOutput action = new OFActionOutput();
101                 action.setMaxLength((short) 0);
102                 action.setPort(outPort);
103                 List<OFAction> actions = new ArrayList<OFAction>();
104                 actions.add(action);
105                 fm.setActions(actions);
106                 fm.setLength(U16.t(OFFlowMod.MINIMUM_LENGTH+OFActionOutput.MINIMUM_LENGTH));
107                 try {
108                     stream.write(fm);
109                 } catch (IOException e) {
110                     e.printStackTrace();
111                 }
112             }
113
114             // Send a packet out
115             if (outPort == null || pi.getBufferId() == 0xffffffff) {
116                 OFPacketOut po = new OFPacketOut();
117                 po.setBufferId(bufferId);
118                 po.setInPort(pi.getInPort());
119
120                 // set actions
121                 OFActionOutput action = new OFActionOutput();
122                 action.setMaxLength((short) 0);
123                 action.setPort((short) ((outPort == null) ? OFPort.OFPP_FLOOD
124                         .getValue() : outPort));
125                 List<OFAction> actions = new ArrayList<OFAction>();
126                 actions.add(action);
127                 po.setActions(actions);
128                 po.setActionsLength((short) OFActionOutput.MINIMUM_LENGTH);
129
130                 // set data if needed
131                 if (bufferId == 0xffffffff) {
132                     byte[] packetData = pi.getPacketData();
133                     po.setLength(U16.t(OFPacketOut.MINIMUM_LENGTH
134                             + po.getActionsLength() + packetData.length));
135                     po.setPacketData(packetData);
136                 } else {
137                     po.setLength(U16.t(OFPacketOut.MINIMUM_LENGTH
138                             + po.getActionsLength()));
139                 }
140                 try {
141                     stream.write(po);
142                 } catch (IOException e) {
143                     e.printStackTrace();
144                 }
145             }
146         }
147
148         public String toString() {
149             InetAddress remote = sock.socket().getInetAddress();
150             return remote.getHostAddress() + ":" + sock.socket().getPort();
151         }
152
153         public OFMessageAsyncStream getStream() {
154             return stream;
155         }
156     }
157
158     public SimpleController(int port) throws IOException{
159         listenSock = ServerSocketChannel.open();
160         listenSock.configureBlocking(false);
161         listenSock.socket().bind(new java.net.InetSocketAddress(port));
162         listenSock.socket().setReuseAddress(true);
163         this.port = port;
164         switchSelectLoops = new ArrayList<SelectLoop>();
165         switchSockets = new ConcurrentHashMap<SocketChannel,OFSwitch>();
166         threadCount = 1;
167         listenSelectLoop = new SelectLoop(this);
168         // register this connection for accepting
169         listenSelectLoop.register(listenSock, SelectionKey.OP_ACCEPT, listenSock);
170
171         this.factory = new BasicFactory();
172     }
173
174     @Override
175     public void handleEvent(SelectionKey key, Object arg) throws IOException {
176         if (arg instanceof ServerSocketChannel)
177             handleListenEvent(key, (ServerSocketChannel)arg);
178         else
179             handleSwitchEvent(key, (SocketChannel) arg);
180     }
181
182     protected void handleListenEvent(SelectionKey key, ServerSocketChannel ssc)
183             throws IOException {
184         SocketChannel sock = listenSock.accept();
185         OFMessageAsyncStream stream = new OFMessageAsyncStream(sock, factory);
186         switchSockets.put(sock, new OFSwitch(sock, stream));
187         System.err
188                 .println("Got new connection from " + switchSockets.get(sock));
189         List<OFMessage> l = new ArrayList<OFMessage>();
190         l.add(factory.getMessage(OFType.HELLO));
191         l.add(factory.getMessage(OFType.FEATURES_REQUEST));
192         stream.write(l);
193
194         int ops = SelectionKey.OP_READ;
195         if (stream.needsFlush())
196             ops |= SelectionKey.OP_WRITE;
197
198         // hash this switch into a thread
199         SelectLoop sl = switchSelectLoops.get(sock.hashCode()
200                 % switchSelectLoops.size());
201         sl.register(sock, ops, sock);
202         // force select to return and re-enter using the new set of keys
203         sl.wakeup();
204     }
205
206     protected void handleSwitchEvent(SelectionKey key, SocketChannel sock) {
207         OFSwitch sw = switchSockets.get(sock);
208         OFMessageAsyncStream stream = sw.getStream();
209         try {
210             if (key.isReadable()) {
211                 List<OFMessage> msgs = stream.read();
212                 if (msgs == null) {
213                     key.cancel();
214                     switchSockets.remove(sock);
215                     return;
216                 }
217
218                 for (OFMessage m : msgs) {
219                     switch (m.getType()) {
220                         case PACKET_IN:
221                             sw.handlePacketIn((OFPacketIn) m);
222                             break;
223                         case HELLO:
224                             System.err.println("GOT HELLO from " + sw);
225                             break;
226                         case ECHO_REQUEST:
227                             OFEchoReply reply = (OFEchoReply) stream
228                                 .getMessageFactory().getMessage(
229                                         OFType.ECHO_REPLY);
230                             reply.setXid(m.getXid());
231                             stream.write(reply);
232                             break;
233                         default:
234                             System.err.println("Unhandled OF message: "
235                                     + m.getType() + " from "
236                                     + sock.socket().getInetAddress());
237                     }
238                 }
239             }
240             if (key.isWritable()) {
241                 stream.flush();
242             }
243
244             /**
245              * Only register for interest in R OR W, not both, causes stream
246              * deadlock after some period of time
247              */
248             if (stream.needsFlush())
249                 key.interestOps(SelectionKey.OP_WRITE);
250             else
251                 key.interestOps(SelectionKey.OP_READ);
252         } catch (IOException e) {
253             // if we have an exception, disconnect the switch
254             key.cancel();
255             switchSockets.remove(sock);
256         }
257     }
258
259     public void run() throws IOException{
260         System.err.println("Starting " + this.getClass().getCanonicalName() + 
261                 " on port " + this.port + " with " + this.threadCount + " threads");
262         // Static number of threads equal to processor cores
263         es = Executors.newFixedThreadPool(threadCount);
264
265         // Launch one select loop per threadCount and start running
266         for (int i = 0; i < threadCount; ++i) {
267             final SelectLoop sl = new SelectLoop(this);
268             switchSelectLoops.add(sl);
269             es.execute(new Runnable() {
270                 @Override
271                 public void run() {
272                     try {
273                         sl.doLoop();
274                     } catch (IOException e) {
275                         e.printStackTrace();
276                     }
277                 }}
278             );
279         }
280
281         // Start the listen loop
282         listenSelectLoop.doLoop();
283     }
284
285     public static void main(String [] args) throws IOException {
286         SimpleCLI cmd = parseArgs(args);
287         int port = Integer.valueOf(cmd.getOptionValue("p"));
288         SimpleController sc = new SimpleController(port);
289         sc.threadCount = Integer.valueOf(cmd.getOptionValue("t"));
290         sc.run();
291     }
292
293     public static SimpleCLI parseArgs(String[] args) {
294         Options options = new Options();
295         options.addOption("h", "help", "print help");
296         // unused?
297         // options.addOption("n", true, "the number of packets to send");
298         options.addOption("p", "port", 6633, "the port to listen on");
299         options.addOption("t", "threads", 1, "the number of threads to run");
300         try {
301             SimpleCLI cmd = SimpleCLI.parse(options, args);
302             if (cmd.hasOption("h")) {
303                 printUsage(options);
304                 System.exit(0);
305             }
306             return cmd;
307         } catch (ParseException e) {
308             System.err.println(e);
309             printUsage(options);
310         }
311
312         System.exit(-1);
313         return null;
314     }
315
316     public static void printUsage(Options options) {
317         SimpleCLI.printHelp("Usage: "
318                 + SimpleController.class.getCanonicalName() + " [options]",
319                 options);
320     }
321 }