Merge "Bug 2347: DOMConcurrentDataCommitCoordinator uses wrong phase name"
[controller.git] / third-party / openflowj / src / main / java / org / openflow / example / SelectLoop.java
1 package org.openflow.example;
2
3 import java.io.IOException;
4 import java.nio.channels.CancelledKeyException;
5 import java.nio.channels.ClosedChannelException;
6 import java.nio.channels.SelectableChannel;
7 import java.nio.channels.SelectionKey;
8 import java.nio.channels.Selector;
9 import java.nio.channels.spi.SelectorProvider;
10 import java.util.Iterator;
11 import java.util.Queue;
12 import java.util.concurrent.ConcurrentLinkedQueue;
13
14 /***
15  * Dirt simple SelectLoop for simple java controller
16  */
17
18
19 public class SelectLoop {
20     protected SelectListener callback;
21     protected boolean dontStop;
22     protected Object registrationLock;
23     protected int registrationRequests = 0;
24     protected Queue<Object[]> registrationQueue;
25     protected Selector selector;
26     protected long timeout;
27
28     public SelectLoop(SelectListener cb) throws IOException {
29         callback = cb;
30         dontStop = true;
31         selector = SelectorProvider.provider().openSelector();
32         registrationLock = new Object();
33         registrationQueue = new ConcurrentLinkedQueue<Object[]>();
34         timeout = 0;
35     }
36
37     /**
38      * Initializes this SelectLoop
39      * @param cb the callback to call when select returns
40      * @param timeout the timeout value in milliseconds that select will be
41      *        called with
42      * @throws IOException
43      */
44     public SelectLoop(SelectListener cb, long timeout) throws IOException {
45         callback = cb;
46         dontStop = true;
47         selector = SelectorProvider.provider().openSelector();
48         registrationLock = new Object();
49         registrationQueue = new ConcurrentLinkedQueue<Object[]>();
50         this.timeout = timeout;
51     }
52
53     public void register(SelectableChannel ch, int ops, Object arg)
54             throws ClosedChannelException {
55         registrationQueue.add(new Object[] {ch, ops, arg});
56     }
57
58     /**
59      * Registers the supplied SelectableChannel with this SelectLoop. Note this
60      * method blocks until registration proceeds.  It is advised that
61      * SelectLoop is intialized with a timeout value when using this method.
62      * @param ch the channel
63      * @param ops interest ops
64      * @param arg argument that will be returned with the SelectListener
65      * @return
66      * @throws ClosedChannelException
67      */
68     public synchronized SelectionKey registerBlocking(SelectableChannel ch, int ops, Object arg)
69             throws ClosedChannelException {
70         synchronized (registrationLock) {
71             registrationRequests++;
72         }
73         selector.wakeup();
74         SelectionKey key = ch.register(selector, ops, arg);
75         synchronized (registrationLock) {
76             registrationRequests--;
77             registrationLock.notifyAll();
78         }
79         return key;
80     }
81
82     /****
83      * Main top-level IO loop this dispatches all IO events and timer events
84      * together I believe this is fairly efficient
85      */
86     public void doLoop() throws IOException {
87         int nEvents;
88         processRegistrationQueue();
89
90         while (dontStop) {
91             nEvents = selector.select(timeout);
92             if (nEvents > 0) {
93                 for (Iterator<SelectionKey> i = selector.selectedKeys()
94                         .iterator(); i.hasNext();) {
95                     SelectionKey sk = i.next();
96                     i.remove();
97
98                     if (!sk.isValid())
99                         continue;
100
101                     Object arg = sk.attachment();
102                     callback.handleEvent(sk, arg);
103                 }
104             }
105
106             if (this.registrationQueue.size() > 0)
107                 processRegistrationQueue();
108
109             if (registrationRequests > 0) {
110                 synchronized (registrationLock) {
111                     while (registrationRequests > 0) {
112                         try {
113                             registrationLock.wait();
114                         } catch (InterruptedException e) {
115                             e.printStackTrace();
116                         }
117                     }
118                 }
119             }
120         }
121     }
122
123     protected void processRegistrationQueue() {
124         // add any elements in queue
125         for (Iterator<Object[]> it = registrationQueue.iterator(); it.hasNext();) {
126             Object[] args = it.next();
127             SelectableChannel ch = (SelectableChannel) args[0];
128             try {
129                 ch.register(selector, (Integer) args[1], args[2]);
130             } catch (CancelledKeyException cke) {
131                 continue;
132             } catch (ClosedChannelException e) {
133             }
134             it.remove();
135         }
136     }
137
138     /**
139      * Force this select loop to return immediately and re-enter select, useful
140      * for example if a new item has been added to the select loop while it
141      * was already blocked.
142      */
143     public void wakeup() {
144        if (selector != null) {
145            selector.wakeup();
146        }
147     }
148
149     /**
150      * Shuts down this select loop, may return before it has fully shutdown
151      */
152     public void shutdown() {
153         this.dontStop = false;
154         wakeup();
155     }
156 }