return transformingCallback(result, dbSchema);
}
+ @Override
+ public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
+ List<MonitorRequest<E>> monitorRequest,
+ final MonitorHandle monitorHandle,
+ final MonitorCallBack callback) {
+
+ final ImmutableMap<String, MonitorRequest<E>> reqMap = Maps.uniqueIndex(monitorRequest,
+ new Function<MonitorRequest<E>, String>() {
+ @Override
+ public String apply(MonitorRequest<E> input) {
+ return input.getTableName();
+ }
+ });
+
+ registerCallback(monitorHandle, callback, dbSchema);
+
+ ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
+ @Override
+ public List<Object> params() {
+ return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
+ }
+ });
+ JsonNode result;
+ try {
+ result = monitor.get();
+ } catch (InterruptedException | ExecutionException e) {
+ return null;
+ }
+ TableUpdates updates = transformingCallback(result, dbSchema);
+ return updates;
+ }
+
private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
setupUpdateListener();
}
@Override
- public void cancelMonitor(MonitorHandle handler) {
- throw new UnsupportedOperationException("not yet implemented");
+ public void cancelMonitor(final MonitorHandle handler) {
+ ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(new Params() {
+ @Override
+ public List<Object> params() {
+ return Lists.<Object>newArrayList(handler.getId());
+ }
+ });
+
+ JsonNode result = null;
+ try {
+ result = cancelMonitor.get();
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Exception when canceling monitor handler {}", handler.getId());
+ }
+
+ if (result == null) {
+ LOG.error("Fail to cancel monitor with handler {}", handler.getId());
+ } else {
+ LOG.debug("Successfully cancel monitoring for handler {}", handler.getId());
+ }
}
@Override
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.ovsdb.lib.OvsdbClient;
import org.opendaylight.ovsdb.lib.OvsdbConnection;
}
}
- private static ExecutorService executorService = Executors.newFixedThreadPool(10);
+ private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
+ private static int retryPeriod = 100; // retry after 100 milliseconds
private static void handleNewPassiveConnection(final Channel channel) {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE,
- Executors.newFixedThreadPool(NUM_THREADS));
+ SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
+ if (sslHandler != null) {
+ class HandleNewPassiveSslRunner implements Runnable {
+ public SslHandler sslHandler;
+ public final Channel channel;
+ private int retryTimes;
- SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
- if (sslHandler != null) {
- //Wait until ssl handshake is complete
- int count = 0;
- LOG.debug("Check if ssl handshake is done");
- while (sslHandler.engine().getSession().getCipherSuite()
- .equals("SSL_NULL_WITH_NULL_NULL")
- && count < 10) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- LOG.error("Exception while checking if ssl handshake is done", e);
- }
- count++;
- }
- if (sslHandler.engine().getSession().getCipherSuite()
- .equals("SSL_NULL_WITH_NULL_NULL")) {
- LOG.debug("Ssl hanshake is not compelete yet");
- return;
- }
+ public HandleNewPassiveSslRunner(Channel channel, SslHandler sslHandler) {
+ this.channel = channel;
+ this.sslHandler = sslHandler;
+ this.retryTimes = 3;
}
- LOG.debug("Notify listener");
- for (OvsdbConnectionListener listener : connectionListeners) {
- listener.connected(client);
+ @Override
+ public void run() {
+ HandshakeStatus status = sslHandler.engine().getHandshakeStatus();
+ LOG.debug("Handshake status {}", status);
+ switch (status) {
+ case FINISHED:
+ case NOT_HANDSHAKING:
+ //Handshake done. Notify listener.
+ OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE,
+ Executors.newFixedThreadPool(NUM_THREADS));
+
+ LOG.debug("Notify listener");
+ for (OvsdbConnectionListener listener : connectionListeners) {
+ listener.connected(client);
+ }
+ break;
+
+ case NEED_UNWRAP:
+ case NEED_TASK:
+ //Handshake still ongoing. Retry later.
+ LOG.debug("handshake not done yet {}", status);
+ executorService.schedule(this, retryPeriod, TimeUnit.MILLISECONDS);
+ break;
+
+ case NEED_WRAP:
+ if (sslHandler.engine().getSession().getCipherSuite()
+ .equals("SSL_NULL_WITH_NULL_NULL")) {
+ /* peer not authenticated. No need to notify listener in this case. */
+ LOG.error("Ssl handshake fail. channel {}", channel);
+ } else {
+ /*
+ * peer is authenticated. Give some time to wait for completion.
+ * If status is still NEED_WRAP, client might already disconnect.
+ * This happens when the first time client connects to controller in two-way handshake.
+ * After obtaining controller certificate, client will disconnect and start
+ * new connection with controller certificate it obtained.
+ * In this case no need to do anything for the first connection attempt. Just skip
+ * since client will reconnect later.
+ */
+ LOG.debug("handshake not done yet {}", status);
+ if (retryTimes > 0) {
+ executorService.schedule(this, retryPeriod, TimeUnit.MILLISECONDS);
+ } else {
+ LOG.debug("channel closed {}", channel);
+ }
+ retryTimes--;
+ }
+ break;
+
+ default:
+ LOG.error("unknown hadshake status {}", status);
+ }
}
}
- });
+ executorService.schedule(new HandleNewPassiveSslRunner(channel, sslHandler),
+ retryPeriod, TimeUnit.MILLISECONDS);
+ } else {
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE,
+ Executors.newFixedThreadPool(NUM_THREADS));
+
+ LOG.debug("Notify listener");
+ for (OvsdbConnectionListener listener : connectionListeners) {
+ listener.connected(client);
+ }
+ }
+ });
+ }
}
public static void channelClosed(final OvsdbClient client) {