From: Hsin-Yi Shen Date: Fri, 7 Aug 2015 18:58:20 +0000 (-0700) Subject: Enable cancel monitoring X-Git-Tag: release/beryllium~380^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;ds=sidebyside;h=refs%2Fchanges%2F37%2F24937%2F4;p=ovsdb.git Enable cancel monitoring Enable monitor cancellation rpc method. Also rewrite code of handle new ssl connection to remove thread sleep and improve performance. Signed-off-by: Hsin-Yi Shen Change-Id: Ic5982e28192ed0db1a363889a9eaf3b0817f17b8 --- diff --git a/library/src/main/java/org/opendaylight/ovsdb/lib/OvsdbClient.java b/library/src/main/java/org/opendaylight/ovsdb/lib/OvsdbClient.java index bcea62fa3..bd87919ac 100644 --- a/library/src/main/java/org/opendaylight/ovsdb/lib/OvsdbClient.java +++ b/library/src/main/java/org/opendaylight/ovsdb/lib/OvsdbClient.java @@ -68,6 +68,18 @@ public interface OvsdbClient { List> monitorRequests, MonitorCallBack callback); + /** + * ovsdb monitor operation. + * @param monitorRequests represents what needs to be monitored + * @param monitorHandler A client specified monitor handle. This handle is used to later cancel + * ({@link #cancelMonitor(MonitorHandle)}) the monitor. + * @param callback receives the monitor response + */ + > TableUpdates monitor(DatabaseSchema schema, + List> monitorRequests, + MonitorHandle monitorHandle, + MonitorCallBack callback); + /** * Cancels an existing monitor method. * @param handler Handle identifying a specific monitor request that is being cancelled. diff --git a/library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbClientImpl.java b/library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbClientImpl.java index 2fb2f3a66..cc7472ed6 100644 --- a/library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbClientImpl.java +++ b/library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbClientImpl.java @@ -173,14 +173,64 @@ public class OvsdbClientImpl implements OvsdbClient { return transformingCallback(result, dbSchema); } + @Override + public > TableUpdates monitor(final DatabaseSchema dbSchema, + List> monitorRequest, + final MonitorHandle monitorHandle, + final MonitorCallBack callback) { + + final ImmutableMap> reqMap = Maps.uniqueIndex(monitorRequest, + new Function, String>() { + @Override + public String apply(MonitorRequest input) { + return input.getTableName(); + } + }); + + registerCallback(monitorHandle, callback, dbSchema); + + ListenableFuture monitor = rpc.monitor(new Params() { + @Override + public List params() { + return Lists.newArrayList(dbSchema.getName(), monitorHandle.getId(), reqMap); + } + }); + JsonNode result; + try { + result = monitor.get(); + } catch (InterruptedException | ExecutionException e) { + return null; + } + TableUpdates updates = transformingCallback(result, dbSchema); + return updates; + } + private void registerCallback(MonitorHandle monitorHandle, MonitorCallBack callback, DatabaseSchema schema) { this.monitorCallbacks.put(monitorHandle.getId(), new CallbackContext(callback, schema)); setupUpdateListener(); } @Override - public void cancelMonitor(MonitorHandle handler) { - throw new UnsupportedOperationException("not yet implemented"); + public void cancelMonitor(final MonitorHandle handler) { + ListenableFuture cancelMonitor = rpc.monitor_cancel(new Params() { + @Override + public List params() { + return Lists.newArrayList(handler.getId()); + } + }); + + JsonNode result = null; + try { + result = cancelMonitor.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Exception when canceling monitor handler {}", handler.getId()); + } + + if (result == null) { + LOG.error("Fail to cancel monitor with handler {}", handler.getId()); + } else { + LOG.debug("Successfully cancel monitoring for handler {}", handler.getId()); + } } @Override diff --git a/library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java b/library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java index 44f1ef028..ef2af7530 100644 --- a/library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java +++ b/library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java @@ -28,6 +28,7 @@ import io.netty.handler.ssl.SslHandler; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult.HandshakeStatus; import java.net.InetAddress; import java.util.Arrays; @@ -36,6 +37,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.opendaylight.ovsdb.lib.OvsdbClient; import org.opendaylight.ovsdb.lib.OvsdbConnection; @@ -288,41 +291,91 @@ public class OvsdbConnectionService implements OvsdbConnection { } } - private static ExecutorService executorService = Executors.newFixedThreadPool(10); + private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10); + private static int retryPeriod = 100; // retry after 100 milliseconds private static void handleNewPassiveConnection(final Channel channel) { - executorService.execute(new Runnable() { - @Override - public void run() { - OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE, - Executors.newFixedThreadPool(NUM_THREADS)); + SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl"); + if (sslHandler != null) { + class HandleNewPassiveSslRunner implements Runnable { + public SslHandler sslHandler; + public final Channel channel; + private int retryTimes; - SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl"); - if (sslHandler != null) { - //Wait until ssl handshake is complete - int count = 0; - LOG.debug("Check if ssl handshake is done"); - while (sslHandler.engine().getSession().getCipherSuite() - .equals("SSL_NULL_WITH_NULL_NULL") - && count < 10) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - LOG.error("Exception while checking if ssl handshake is done", e); - } - count++; - } - if (sslHandler.engine().getSession().getCipherSuite() - .equals("SSL_NULL_WITH_NULL_NULL")) { - LOG.debug("Ssl hanshake is not compelete yet"); - return; - } + public HandleNewPassiveSslRunner(Channel channel, SslHandler sslHandler) { + this.channel = channel; + this.sslHandler = sslHandler; + this.retryTimes = 3; } - LOG.debug("Notify listener"); - for (OvsdbConnectionListener listener : connectionListeners) { - listener.connected(client); + @Override + public void run() { + HandshakeStatus status = sslHandler.engine().getHandshakeStatus(); + LOG.debug("Handshake status {}", status); + switch (status) { + case FINISHED: + case NOT_HANDSHAKING: + //Handshake done. Notify listener. + OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE, + Executors.newFixedThreadPool(NUM_THREADS)); + + LOG.debug("Notify listener"); + for (OvsdbConnectionListener listener : connectionListeners) { + listener.connected(client); + } + break; + + case NEED_UNWRAP: + case NEED_TASK: + //Handshake still ongoing. Retry later. + LOG.debug("handshake not done yet {}", status); + executorService.schedule(this, retryPeriod, TimeUnit.MILLISECONDS); + break; + + case NEED_WRAP: + if (sslHandler.engine().getSession().getCipherSuite() + .equals("SSL_NULL_WITH_NULL_NULL")) { + /* peer not authenticated. No need to notify listener in this case. */ + LOG.error("Ssl handshake fail. channel {}", channel); + } else { + /* + * peer is authenticated. Give some time to wait for completion. + * If status is still NEED_WRAP, client might already disconnect. + * This happens when the first time client connects to controller in two-way handshake. + * After obtaining controller certificate, client will disconnect and start + * new connection with controller certificate it obtained. + * In this case no need to do anything for the first connection attempt. Just skip + * since client will reconnect later. + */ + LOG.debug("handshake not done yet {}", status); + if (retryTimes > 0) { + executorService.schedule(this, retryPeriod, TimeUnit.MILLISECONDS); + } else { + LOG.debug("channel closed {}", channel); + } + retryTimes--; + } + break; + + default: + LOG.error("unknown hadshake status {}", status); + } } } - }); + executorService.schedule(new HandleNewPassiveSslRunner(channel, sslHandler), + retryPeriod, TimeUnit.MILLISECONDS); + } else { + executorService.execute(new Runnable() { + @Override + public void run() { + OvsdbClient client = getChannelClient(channel, ConnectionType.PASSIVE, + Executors.newFixedThreadPool(NUM_THREADS)); + + LOG.debug("Notify listener"); + for (OvsdbConnectionListener listener : connectionListeners) { + listener.connected(client); + } + } + }); + } } public static void channelClosed(final OvsdbClient client) { diff --git a/library/src/main/java/org/opendaylight/ovsdb/lib/message/OvsdbRPC.java b/library/src/main/java/org/opendaylight/ovsdb/lib/message/OvsdbRPC.java index adc9a4fae..5927e293d 100644 --- a/library/src/main/java/org/opendaylight/ovsdb/lib/message/OvsdbRPC.java +++ b/library/src/main/java/org/opendaylight/ovsdb/lib/message/OvsdbRPC.java @@ -30,7 +30,7 @@ public interface OvsdbRPC { ListenableFuture cancel(String id); - ListenableFuture monitor_cancel(Object jsonValue); + ListenableFuture monitor_cancel(Params jsonValue); ListenableFuture lock(List id); diff --git a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionInstance.java b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionInstance.java index 36c09022c..547ad2642 100644 --- a/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionInstance.java +++ b/southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionInstance.java @@ -230,4 +230,11 @@ public class OvsdbConnectionInstance implements OvsdbClient { public void setInstanceIdentifier(InstanceIdentifier iid) { this.instanceIdentifier = iid; } + + @Override + public > TableUpdates monitor( + DatabaseSchema schema, List> monitorRequests, + MonitorHandle monitorHandle, MonitorCallBack callback) { + return null; + } }