Enable cancel monitoring 37/24937/4
authorHsin-Yi Shen <syshen66@gmail.com>
Fri, 7 Aug 2015 18:58:20 +0000 (11:58 -0700)
committerHsin-Yi Shen <syshen66@gmail.com>
Tue, 11 Aug 2015 18:44:14 +0000 (18:44 +0000)
Enable monitor cancellation rpc method.
Also rewrite code of handle new ssl connection to remove thread sleep
and improve performance.

Signed-off-by: Hsin-Yi Shen <syshen66@gmail.com>
Change-Id: Ic5982e28192ed0db1a363889a9eaf3b0817f17b8

library/src/main/java/org/opendaylight/ovsdb/lib/OvsdbClient.java
library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbClientImpl.java
library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java
library/src/main/java/org/opendaylight/ovsdb/lib/message/OvsdbRPC.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionInstance.java

index bcea62fa3baf3f674bef516d52adc63ce093773c..bd87919ac3a1b20b1ae830cbbfbe1e246a23f18e 100644 (file)
@@ -68,6 +68,18 @@ public interface OvsdbClient {
                                                     List<MonitorRequest<E>> monitorRequests,
                                                     MonitorCallBack callback);
 
+    /**
+     * ovsdb <a href="http://tools.ietf.org/html/draft-pfaff-ovsdb-proto-04#section-4.1.5">monitor</a> operation.
+     * @param monitorRequests represents what needs to be monitored
+     * @param monitorHandler  A client specified monitor handle. This handle is used to later cancel
+     *                       ({@link #cancelMonitor(MonitorHandle)}) the monitor.
+     * @param callback receives the monitor response
+     */
+    <E extends TableSchema<E>> TableUpdates monitor(DatabaseSchema schema,
+                                                    List<MonitorRequest<E>> monitorRequests,
+                                                    MonitorHandle monitorHandle,
+                                                    MonitorCallBack callback);
+
     /**
      * Cancels an existing monitor method.
      * @param handler Handle identifying a specific monitor request that is being cancelled.
index 2fb2f3a669789392806a1fbe78a05ea2cea29248..cc7472ed671f92a6e1bca1cd2ee5b56df43ba3e3 100644 (file)
@@ -173,14 +173,64 @@ public class OvsdbClientImpl implements OvsdbClient {
         return transformingCallback(result, dbSchema);
     }
 
+    @Override
+    public <E extends TableSchema<E>> TableUpdates monitor(final DatabaseSchema dbSchema,
+                                                           List<MonitorRequest<E>> monitorRequest,
+                                                           final MonitorHandle monitorHandle,
+                                                           final MonitorCallBack callback) {
+
+        final ImmutableMap<String, MonitorRequest<E>> reqMap = Maps.uniqueIndex(monitorRequest,
+                new Function<MonitorRequest<E>, String>() {
+                    @Override
+                    public String apply(MonitorRequest<E> input) {
+                        return input.getTableName();
+                    }
+                });
+
+        registerCallback(monitorHandle, callback, dbSchema);
+
+        ListenableFuture<JsonNode> monitor = rpc.monitor(new Params() {
+            @Override
+            public List<Object> params() {
+                return Lists.<Object>newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap);
+            }
+        });
+        JsonNode result;
+        try {
+            result = monitor.get();
+        } catch (InterruptedException | ExecutionException e) {
+            return null;
+        }
+        TableUpdates updates = transformingCallback(result, dbSchema);
+        return updates;
+    }
+
     private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) {
         this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema));
         setupUpdateListener();
     }
 
     @Override
-    public void cancelMonitor(MonitorHandle handler) {
-        throw new UnsupportedOperationException("not yet implemented");
+    public void cancelMonitor(final MonitorHandle handler) {
+        ListenableFuture<JsonNode> cancelMonitor = rpc.monitor_cancel(new Params() {
+            @Override
+            public List<Object> params() {
+                return Lists.<Object>newArrayList(handler.getId());
+            }
+        });
+
+        JsonNode result = null;
+        try {
+            result = cancelMonitor.get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Exception when canceling monitor handler {}", handler.getId());
+        }
+
+        if (result == null) {
+            LOG.error("Fail to cancel monitor with handler {}", handler.getId());
+        } else {
+            LOG.debug("Successfully cancel monitoring for handler {}", handler.getId());
+        }
     }
 
     @Override
index 44f1ef028f3e2baa077f6a0bf9c5a9f65e92baac..ef2af7530e7a5ec32d49d4efdb5f588b4326c5ac 100644 (file)
@@ -28,6 +28,7 @@ import io.netty.handler.ssl.SslHandler;
 
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 
 import java.net.InetAddress;
 import java.util.Arrays;
@@ -36,6 +37,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.ovsdb.lib.OvsdbClient;
 import org.opendaylight.ovsdb.lib.OvsdbConnection;
@@ -288,41 +291,91 @@ public class OvsdbConnectionService implements OvsdbConnection {
         }
     }
 
-    private static ExecutorService executorService = Executors.newFixedThreadPool(10);
+    private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
+    private static int retryPeriod = 100; // retry after 100 milliseconds
     private static void handleNewPassiveConnection(final Channel channel) {
-        executorService.execute(new Runnable() {
-            @Override
-            public void run() {
-                OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE,
-                        Executors.newFixedThreadPool(NUM_THREADS));
+        SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
+        if (sslHandler != null) {
+            class HandleNewPassiveSslRunner implements Runnable {
+                public SslHandler sslHandler;
+                public final Channel channel;
+                private int retryTimes;
 
-                SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
-                if (sslHandler != null) {
-                    //Wait until ssl handshake is complete
-                    int count = 0;
-                    LOG.debug("Check if ssl handshake is done");
-                    while (sslHandler.engine().getSession().getCipherSuite()
-                                            .equals("SSL_NULL_WITH_NULL_NULL")
-                                            && count < 10) {
-                        try {
-                            Thread.sleep(100);
-                        } catch (InterruptedException e) {
-                            LOG.error("Exception while checking if ssl handshake is done", e);
-                        }
-                        count++;
-                    }
-                    if (sslHandler.engine().getSession().getCipherSuite()
-                                           .equals("SSL_NULL_WITH_NULL_NULL")) {
-                        LOG.debug("Ssl hanshake is not compelete yet");
-                        return;
-                    }
+                public HandleNewPassiveSslRunner(Channel channel, SslHandler sslHandler) {
+                    this.channel = channel;
+                    this.sslHandler = sslHandler;
+                    this.retryTimes = 3;
                 }
-                LOG.debug("Notify listener");
-                for (OvsdbConnectionListener listener : connectionListeners) {
-                    listener.connected(client);
+                @Override
+                public void run() {
+                    HandshakeStatus status = sslHandler.engine().getHandshakeStatus();
+                    LOG.debug("Handshake status {}", status);
+                    switch (status) {
+                        case FINISHED:
+                        case NOT_HANDSHAKING:
+                            //Handshake done. Notify listener.
+                            OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE,
+                                                 Executors.newFixedThreadPool(NUM_THREADS));
+
+                            LOG.debug("Notify listener");
+                            for (OvsdbConnectionListener listener : connectionListeners) {
+                                listener.connected(client);
+                            }
+                            break;
+
+                        case NEED_UNWRAP:
+                        case NEED_TASK:
+                            //Handshake still ongoing. Retry later.
+                            LOG.debug("handshake not done yet {}", status);
+                            executorService.schedule(this,  retryPeriod, TimeUnit.MILLISECONDS);
+                            break;
+
+                        case NEED_WRAP:
+                            if (sslHandler.engine().getSession().getCipherSuite()
+                                    .equals("SSL_NULL_WITH_NULL_NULL")) {
+                                /* peer not authenticated. No need to notify listener in this case. */
+                                LOG.error("Ssl handshake fail. channel {}", channel);
+                            } else {
+                                /*
+                                 * peer is authenticated. Give some time to wait for completion.
+                                 * If status is still NEED_WRAP, client might already disconnect.
+                                 * This happens when the first time client connects to controller in two-way handshake.
+                                 * After obtaining controller certificate, client will disconnect and start
+                                 * new connection with controller certificate it obtained.
+                                 * In this case no need to do anything for the first connection attempt. Just skip
+                                 * since client will reconnect later.
+                                 */
+                                LOG.debug("handshake not done yet {}", status);
+                                if (retryTimes > 0) {
+                                    executorService.schedule(this,  retryPeriod, TimeUnit.MILLISECONDS);
+                                } else {
+                                    LOG.debug("channel closed {}", channel);
+                                }
+                                retryTimes--;
+                            }
+                            break;
+
+                        default:
+                            LOG.error("unknown hadshake status {}", status);
+                    }
                 }
             }
-        });
+            executorService.schedule(new HandleNewPassiveSslRunner(channel, sslHandler),
+                    retryPeriod, TimeUnit.MILLISECONDS);
+        } else {
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE,
+                            Executors.newFixedThreadPool(NUM_THREADS));
+
+                    LOG.debug("Notify listener");
+                    for (OvsdbConnectionListener listener : connectionListeners) {
+                        listener.connected(client);
+                    }
+                }
+            });
+        }
     }
 
     public static void channelClosed(final OvsdbClient client) {
index adc9a4fae88172d8612f66edf4a580c29a442ec5..5927e293d50f89468173517aaf125c68bb8c61d9 100644 (file)
@@ -30,7 +30,7 @@ public interface OvsdbRPC {
 
     ListenableFuture<Response> cancel(String id);
 
-    ListenableFuture<Object> monitor_cancel(Object jsonValue);
+    ListenableFuture<JsonNode> monitor_cancel(Params jsonValue);
 
     ListenableFuture<Object> lock(List<String> id);
 
index 36c09022c0c957df9d18a367e6d98f5cefce92a7..547ad2642c06230b60491193d71e22fcbcacef5a 100644 (file)
@@ -230,4 +230,11 @@ public class OvsdbConnectionInstance implements OvsdbClient {
     public void setInstanceIdentifier(InstanceIdentifier<Node> iid) {
         this.instanceIdentifier = iid;
     }
+
+    @Override
+    public <E extends TableSchema<E>> TableUpdates monitor(
+            DatabaseSchema schema, List<MonitorRequest<E>> monitorRequests,
+            MonitorHandle monitorHandle, MonitorCallBack callback) {
+        return null;
+    }
 }