Bug 6185 - southbound system tests failing when 13/42113/3
authorAnil Vishnoi <vishnoianil@gmail.com>
Tue, 19 Jul 2016 19:43:56 +0000 (12:43 -0700)
committerAnil Vishnoi <vishnoianil@gmail.com>
Thu, 21 Jul 2016 00:59:48 +0000 (17:59 -0700)
run with all other compatible OpenDaylight features.

This patch fixes following issues related to library
connection handling
*.* If any non-ovsdb device (e.g. openflow) sends a
connection request to ovsdb, library does not throw
error message, rather it holds the channel and wait
for timeout.
*.* In the above case, it also publishes the connect/disconnect
notification up to the plugin.
*.* HwvtepConnectionInstance and OvsdbConnectionInstance
is implementing OvsdbClient interface from library, that
is not required. Echo service should be done by the library
and not by the plugin logic code.
*.* RPC Future objects were not cleaned by library,
and that's leaking the memory.
*.* In case of no echo response, library dosn't fail
the respective rpc future object.

Change-Id: If7b26594b057465d0c08638857bdcb76857705e1
Signed-off-by: Anil Vishnoi <vishnoianil@gmail.com>
13 files changed:
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionInstance.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/HwvtepConnectionManager.java
hwvtepsouthbound/hwvtepsouthbound-impl/src/main/java/org/opendaylight/ovsdb/hwvtepsouthbound/transact/TransactInvokerImpl.java
library/impl/src/main/java/org/opendaylight/ovsdb/lib/OvsdbClient.java
library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbClientImpl.java
library/impl/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java
library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/ExceptionHandler.java
library/impl/src/main/java/org/opendaylight/ovsdb/lib/jsonrpc/JsonRpcEndpoint.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionInstance.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionManager.java
southbound/southbound-impl/src/main/java/org/opendaylight/ovsdb/southbound/ovsdb/transact/TransactInvokerImpl.java
southbound/southbound-impl/src/test/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionInstanceTest.java
southbound/southbound-impl/src/test/java/org/opendaylight/ovsdb/southbound/OvsdbConnectionManagerTest.java

index ef21a294c57b39f205de45b7ffb143d5f3f7eb02..7221a60cdabe6f5565931207fb8f5052ff897f30 100644 (file)
@@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
 
-public class HwvtepConnectionInstance implements OvsdbClient{
+public class HwvtepConnectionInstance {
     private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionInstance.class);
     private ConnectionInfo connectionInfo;
     private OvsdbClient client;
@@ -67,9 +67,8 @@ public class HwvtepConnectionInstance implements OvsdbClient{
     private HwvtepGlobalAugmentation initialCreatedData = null;
     private HwvtepDeviceInfo deviceInfo;
 
-
-    HwvtepConnectionInstance (ConnectionInfo key,OvsdbClient client,
-                    InstanceIdentifier<Node> iid, TransactionInvoker txInvoker) {
+    HwvtepConnectionInstance (ConnectionInfo key, OvsdbClient client,
+                              InstanceIdentifier<Node> iid, TransactionInvoker txInvoker) {
         this.connectionInfo = key;
         this.client = client;
         this.instanceIdentifier = iid;
@@ -171,7 +170,6 @@ public class HwvtepConnectionInstance implements OvsdbClient{
         return client.monitor(schema, monitorRequests, callback);
     }
 
-    @Override
     public <E extends TableSchema<E>> TableUpdates monitor(DatabaseSchema schema,
                     List<MonitorRequest> monitorRequests, MonitorHandle monitorHandle, MonitorCallBack callback) {
         return null;
@@ -193,19 +191,6 @@ public class HwvtepConnectionInstance implements OvsdbClient{
         return client.unLock(lockId);
     }
 
-    public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
-        client.startEchoService(callbackFilters);
-    }
-
-    public void stopEchoService() {
-        client.stopEchoService();
-    }
-
-    @Override
-    public ListenableFuture<List<String>> echo() {
-        return client.echo();
-    }
-
     public OvsdbConnectionInfo getConnectionInfo() {
         return client.getConnectionInfo();
     }
@@ -303,4 +288,7 @@ public class HwvtepConnectionInstance implements OvsdbClient{
         return this.deviceInfo;
     }
 
+    public OvsdbClient getOvsdbClient() {
+        return client;
+    }
 }
index 7c99add047b64d75dd33469dd1d17d28acfec54c..98a6661381781531e7ba7a3f0d4890ffdb8ab5e3 100644 (file)
@@ -17,6 +17,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -70,6 +72,7 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo
     private Map<ConnectionInfo, HwvtepConnectionInstance> clients = new ConcurrentHashMap<>();
     private static final Logger LOG = LoggerFactory.getLogger(HwvtepConnectionManager.class);
     private static final String ENTITY_TYPE = "hwvtep";
+    private static final int DB_FETCH_TIMEOUT = 1000;
 
     private DataBroker db;
     private TransactionInvoker txInvoker;
@@ -94,29 +97,30 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo
             hwvtepDeviceEntityOwnershipListener.close();
         }
 
-        for (OvsdbClient client: clients.values()) {
+        for (HwvtepConnectionInstance client: clients.values()) {
             client.disconnect();
         }
     }
 
     @Override
-    public void connected(@Nonnull final OvsdbClient client) {
+    public void connected(@Nonnull final OvsdbClient externalClient) {
         LOG.info("Library connected {} from {}:{} to {}:{}",
-                client.getConnectionInfo().getType(),
-                client.getConnectionInfo().getRemoteAddress(),
-                client.getConnectionInfo().getRemotePort(),
-                client.getConnectionInfo().getLocalAddress(),
-                client.getConnectionInfo().getLocalPort());
+                externalClient.getConnectionInfo().getType(),
+                externalClient.getConnectionInfo().getRemoteAddress(),
+                externalClient.getConnectionInfo().getRemotePort(),
+                externalClient.getConnectionInfo().getLocalAddress(),
+                externalClient.getConnectionInfo().getLocalPort());
         List<String> databases = new ArrayList<>();
         try {
-            databases = client.getDatabases().get();
-        } catch (InterruptedException | ExecutionException e) {
-            LOG.warn("Unable to fetch database list");
-        }
-
-        if(databases.contains(HwvtepSchemaConstants.HARDWARE_VTEP)) {
-            HwvtepConnectionInstance hwClient = connectedButCallBacksNotRegistered(client);
-            registerEntityForOwnership(hwClient);
+            databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS);
+            if(databases.contains(HwvtepSchemaConstants.HARDWARE_VTEP)) {
+                HwvtepConnectionInstance hwClient = connectedButCallBacksNotRegistered(externalClient);
+                registerEntityForOwnership(hwClient);
+            }
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            LOG.warn("Unable to fetch Database list from device {}. Disconnecting from the device.",
+                    externalClient.getConnectionInfo().getRemoteAddress(), e);
+            externalClient.disconnect();
         }
     }
 
@@ -275,7 +279,7 @@ public class HwvtepConnectionManager implements OvsdbConnectionListener, AutoClo
     }
 
     public OvsdbClient getClient(ConnectionInfo connectionInfo) {
-        return getConnectionInstance(connectionInfo);
+        return getConnectionInstance(connectionInfo).getOvsdbClient();
     }
 
     private void registerEntityForOwnership(HwvtepConnectionInstance hwvtepConnectionInstance) {
index 4402c889c136b4a84b5676294c62a39b59de4d7e..5ea0be4dc646e5af86b9ccd7530a6348c18c986e 100644 (file)
@@ -31,7 +31,7 @@ public class TransactInvokerImpl implements TransactInvoker {
 
     @Override
     public void invoke(TransactCommand command) {
-        TransactionBuilder tb = new TransactionBuilder(connectionInstance, dbSchema);
+        TransactionBuilder tb = new TransactionBuilder(connectionInstance.getOvsdbClient(), dbSchema);
         command.execute(tb);
         ListenableFuture<List<OperationResult>> result = tb.execute();
         LOG.debug("invoke: command: {}, tb: {}", command, tb);
index 4efce0d87d5d633e34c6c3c6859cda1d276363cc..107d75681a7f932e39a82e019d7735d738109c0c 100644 (file)
@@ -159,4 +159,7 @@ public interface OvsdbClient {
     <T extends TypedBaseTable<?>> T getTypedRowWrapper(final Class<T> klazz,
                                                        final Row<GenericTableSchema> row);
 
+    boolean isConnectionPublished();
+
+    void setConnectionPublished(boolean status);
 }
index 807b9ea05acfae353e4908fb4a26342281a7453d..e5ab1b0857cb9d1d79e4ba058df2dec10a939a1a 100644 (file)
@@ -68,10 +68,12 @@ public class OvsdbClientImpl implements OvsdbClient {
     private OvsdbRPC.Callback rpcCallback;
     private OvsdbConnectionInfo connectionInfo;
     private Channel channel;
+    private boolean isConnectionPublished;
+
     private static final ThreadFactory threadFactorySSL =
-        new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-SSL-%d").build();
+        new ThreadFactoryBuilder().setNameFormat("OVSDBPassiveConnSSL-%d").build();
     private static final ThreadFactory threadFactoryNonSSL =
-        new ThreadFactoryBuilder().setNameFormat("OVSDB-PassiveConnection-Non-SSL-%d").build();
+        new ThreadFactoryBuilder().setNameFormat("OVSDBPassiveConnNonSSL-%d").build();
 
     public OvsdbClientImpl(OvsdbRPC rpc, Channel channel, ConnectionType type,
         SocketConnectionType socketConnType) {
@@ -103,7 +105,7 @@ public class OvsdbClientImpl implements OvsdbClient {
             }
         } else if (type == ConnectionType.ACTIVE) {
             ThreadFactory threadFactorySSL =
-                new ThreadFactoryBuilder().setNameFormat("OVSDB-ActiveConnection-" + executorNameArgs + "-%d")
+                new ThreadFactoryBuilder().setNameFormat("OVSDB-ActiveConn-" + executorNameArgs + "-%d")
                     .build();
             return threadFactorySSL;
         }
@@ -474,4 +476,14 @@ public class OvsdbClientImpl implements OvsdbClient {
         channel.disconnect();
         executorService.shutdown();
     }
+
+    @Override
+    public boolean isConnectionPublished() {
+        return isConnectionPublished;
+    }
+
+    @Override
+    public void setConnectionPublished(boolean connectionPublished) {
+        isConnectionPublished = connectionPublished;
+    }
 }
index 674e7af680fb4f6728f1a57f269654300180cf4a..7bda83eb8e36aae627c7fb446a6463d0d492f63b 100644 (file)
@@ -13,6 +13,9 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
@@ -40,10 +43,12 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
@@ -80,9 +85,15 @@ import org.slf4j.LoggerFactory;
 public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionService.class);
 
-    private static ThreadFactory threadFactory = new ThreadFactoryBuilder()
-            .setNameFormat("OVSDB-PassiveConnectionService-%d").build();
-    private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10, threadFactory);
+    private static ThreadFactory passiveConnectionThreadFactory = new ThreadFactoryBuilder()
+            .setNameFormat("OVSDBPassiveConnServ-%d").build();
+    private static ScheduledExecutorService executorService
+            = Executors.newScheduledThreadPool(10, passiveConnectionThreadFactory);
+
+    private static ThreadFactory connectionNotifierThreadFactory = new ThreadFactoryBuilder()
+            .setNameFormat("OVSDBConnNotifSer-%d").build();
+    private static ExecutorService connectionNotifierService
+            = Executors.newCachedThreadPool(connectionNotifierThreadFactory);
 
     private static Set<OvsdbConnectionListener> connectionListeners = Sets.newHashSet();
     private static Map<OvsdbClient, Channel> connections = Maps.newHashMap();
@@ -309,15 +320,33 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     }
 
     private static void handleNewPassiveConnection(OvsdbClient client) {
-        List<OvsdbClient> clientsFromSameNode = getPassiveClientsFromSameNode(client);
-        if (clientsFromSameNode.size() == 0) {
-            notifyListenerForPassiveConnection(client);
-        } else {
-            stalePassiveConnectionService.handleNewPassiveConnection(client, clientsFromSameNode);
-        }
+        ListenableFuture<List<String>> echoFuture = client.echo();
+        LOG.debug("Send echo message to probe the OVSDB switch {}",client.getConnectionInfo());
+        Futures.addCallback(echoFuture, new FutureCallback<List<String>>() {
+            @Override
+            public void onSuccess(@Nullable List<String> result) {
+                LOG.debug("Probe was successful to OVSDB switch {}",client.getConnectionInfo());
+                List<OvsdbClient> clientsFromSameNode = getPassiveClientsFromSameNode(client);
+                if (clientsFromSameNode.size() == 0) {
+                    notifyListenerForPassiveConnection(client);
+                } else {
+                    stalePassiveConnectionService.handleNewPassiveConnection(client, clientsFromSameNode);
+                }
+            }
+
+            @Override
+            public void onFailure(Throwable failureException) {
+                LOG.error("Probe failed to OVSDB switch. Disconnecting the channel {}", client.getConnectionInfo());
+                client.disconnect();
+            }
+        }, connectionNotifierService);
     }
 
     private static void handleNewPassiveConnection(final Channel channel) {
+        if (!channel.isOpen()) {
+            LOG.warn("Channel {} is not open, skipped further processing of the connection.",channel);
+            return;
+        }
         SslHandler sslHandler = (SslHandler) channel.pipeline().get("ssl");
         if (sslHandler != null) {
             class HandleNewPassiveSslRunner implements Runnable {
@@ -414,10 +443,12 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
     public static void channelClosed(final OvsdbClient client) {
         LOG.info("Connection closed {}", client.getConnectionInfo().toString());
         connections.remove(client);
-        for (OvsdbConnectionListener listener : connectionListeners) {
-            listener.disconnected(client);
-            stalePassiveConnectionService.clientDisconnected(client);
+        if (client.isConnectionPublished()) {
+            for (OvsdbConnectionListener listener : connectionListeners) {
+                listener.disconnected(client);
+            }
         }
+        stalePassiveConnectionService.clientDisconnected(client);
     }
 
     @Override
@@ -454,9 +485,16 @@ public class OvsdbConnectionService implements AutoCloseable, OvsdbConnection {
         return passiveClients;
     }
 
-    public static void notifyListenerForPassiveConnection(OvsdbClient client) {
-        for (OvsdbConnectionListener listener : connectionListeners) {
-            listener.connected(client);
+    public static void notifyListenerForPassiveConnection(final OvsdbClient client) {
+        client.setConnectionPublished(true);
+        for (final OvsdbConnectionListener listener : connectionListeners) {
+            connectionNotifierService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    LOG.trace("Connection {} notified to listener {}", client.getConnectionInfo(), listener);
+                    listener.connected(client);
+                }
+            });
         }
     }
 }
index a68ae4f40003a61bbd96891896b69acaac875d92..6cf9e0e33c56b8432c70cf2cea85f6639e0d3642 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.ovsdb.lib.jsonrpc;
 
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.DecoderException;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
@@ -27,22 +28,27 @@ public class ExceptionHandler extends ChannelDuplexHandler {
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-        if ((cause instanceof InvalidEncodingException)
-                || (cause instanceof TooLongFrameException)) {
-            ctx.channel().disconnect();
-        }
+        if (ctx.channel().isActive()) {
+            LOG.error("Exception occurred while processing connection pipeline", cause);
+            if ((cause instanceof InvalidEncodingException)
+                    || (cause instanceof TooLongFrameException || (cause instanceof DecoderException))) {
+                LOG.info("Disconnecting channel to ovsdb {}", ctx.channel());
+                ctx.channel().disconnect();
+                return;
+            }
+
         /* In cases where a connection is quickly established and the closed
-        Catch the IOException and close the channel
+        Catch the IOException and close the channel. Similarly if the peer is
+        powered off, Catch the read time out exception and close the channel
          */
-        if (cause instanceof IOException) {
-            ctx.channel().close();
-        }
-        /* In cases where the peer is power off
-         * Catch the read time out exception and close the channel
-         */
-        if (cause instanceof ReadTimeoutException) {
-            LOG.debug("Read timeout exception: close connection {}", ctx.channel());
-            ctx.channel().close();
+            if ((cause instanceof IOException ) || (cause instanceof ReadTimeoutException)) {
+                LOG.info("Closing channel to ovsdb {}", ctx.channel());
+                ctx.channel().close();
+                return;
+            }
+
+            LOG.error("Exception was not handled by the exception handler, re-throwing it for next handler");
+            ctx.fireExceptionCaught(cause);
         }
     }
 
index 30964ce53399073c32a1506e184365a46a9c4315..087967a070f45f3ad2b3783cd5cfb7f958cfe39f 100644 (file)
@@ -19,6 +19,7 @@ import com.google.common.reflect.Reflection;
 import com.google.common.reflect.TypeToken;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.channel.Channel;
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
@@ -26,6 +27,11 @@ import java.lang.reflect.Method;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
 import org.opendaylight.ovsdb.lib.error.UnexpectedResultException;
 import org.opendaylight.ovsdb.lib.error.UnsupportedArgumentException;
 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
@@ -35,6 +41,12 @@ import org.slf4j.LoggerFactory;
 public class JsonRpcEndpoint {
 
     private static final Logger LOG = LoggerFactory.getLogger(JsonRpcEndpoint.class);
+    private static final int REAPER_INTERVAL = 300;
+    private static final int REAPER_THREADS = 3;
+    private static final ThreadFactory futureReaperThreadFactory = new ThreadFactoryBuilder()
+            .setNameFormat("OVSDB-Lib-Future-Reaper-%d").build();
+    private static final ScheduledExecutorService futureReaperService
+            = Executors.newScheduledThreadPool(REAPER_THREADS, futureReaperThreadFactory);
 
     public class CallContext {
         Method method;
@@ -108,6 +120,15 @@ public class JsonRpcEndpoint {
 
                 SettableFuture<Object> sf = SettableFuture.create();
                 methodContext.put(request.getId(), new CallContext(request, method, sf));
+                futureReaperService.schedule(new Runnable() {
+                    @Override
+                    public void run() {
+                        if (sf.isDone() || sf.isCancelled()) {
+                            return;
+                        }
+                        methodContext.remove(request.getId()).getFuture().cancel(false);
+                    }
+                },REAPER_INTERVAL, TimeUnit.MILLISECONDS);
 
                 nettyChannel.writeAndFlush(requestString);
 
@@ -120,7 +141,7 @@ public class JsonRpcEndpoint {
     public void processResult(JsonNode response) throws NoSuchMethodException {
 
         LOG.trace("Response : {}", response.toString());
-        CallContext returnCtxt = methodContext.get(response.get("id").asText());
+        CallContext returnCtxt = methodContext.remove(response.get("id").asText());
         if (returnCtxt == null) {
             return;
         }
index d29fabe972dec308e42381dd3d750f48e4281f76..8cb9917453ba342e82f069f8b84cf99338357f64 100644 (file)
@@ -23,7 +23,6 @@ import org.opendaylight.controller.md.sal.binding.api.DataTreeModification;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
-import org.opendaylight.ovsdb.lib.EchoServiceCallbackFilters;
 import org.opendaylight.ovsdb.lib.LockAquisitionCallback;
 import org.opendaylight.ovsdb.lib.LockStolenCallback;
 import org.opendaylight.ovsdb.lib.MonitorCallBack;
@@ -65,7 +64,7 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class OvsdbConnectionInstance implements OvsdbClient {
+public class OvsdbConnectionInstance {
     private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionInstance.class);
     private OvsdbClient client;
     private ConnectionInfo connectionInfo;
@@ -78,8 +77,8 @@ public class OvsdbConnectionInstance implements OvsdbClient {
     private EntityOwnershipCandidateRegistration deviceOwnershipCandidateRegistration;
     private OvsdbNodeAugmentation initialCreateData = null;
 
-    OvsdbConnectionInstance(ConnectionInfo key,OvsdbClient client,TransactionInvoker txInvoker,
-            InstanceIdentifier<Node> iid) {
+    OvsdbConnectionInstance(ConnectionInfo key, OvsdbClient client, TransactionInvoker txInvoker,
+                            InstanceIdentifier<Node> iid) {
         this.connectionInfo = key;
         this.client = client;
         this.txInvoker = txInvoker;
@@ -182,7 +181,7 @@ public class OvsdbConnectionInstance implements OvsdbClient {
                     this.initialCreateData.getConnectionInfo().getRemotePort());
         for ( Map.Entry<DatabaseSchema,TransactInvoker> entry: transactInvokers.entrySet()) {
 
-            TransactionBuilder transaction = new TransactionBuilder(this, entry.getKey());
+            TransactionBuilder transaction = new TransactionBuilder(this.client, entry.getKey());
 
             // OpenVSwitchPart
             OpenVSwitch ovs = TyperUtils.getTypedRowWrapper(transaction.getDatabaseSchema(), OpenVSwitch.class);
@@ -260,7 +259,6 @@ public class OvsdbConnectionInstance implements OvsdbClient {
         return client.transactBuilder(dbSchema);
     }
 
-    @Override
     public <E extends TableSchema<E>> TableUpdates monitor(
             DatabaseSchema schema, List<MonitorRequest> monitorRequests,
             MonitorHandle monitorHandle, MonitorCallBack callbackArgument) {
@@ -290,14 +288,6 @@ public class OvsdbConnectionInstance implements OvsdbClient {
         return client.unLock(lockId);
     }
 
-    public void startEchoService(EchoServiceCallbackFilters callbackFilters) {
-        client.startEchoService(callbackFilters);
-    }
-
-    public void stopEchoService() {
-        client.stopEchoService();
-    }
-
     public boolean isActive() {
         return client.isActive();
     }
@@ -393,8 +383,7 @@ public class OvsdbConnectionInstance implements OvsdbClient {
         this.initialCreateData = ovsdbNodeCreateData;
     }
 
-    @Override
-    public ListenableFuture<List<String>> echo() {
-        return client.echo();
+    public OvsdbClient getOvsdbClient() {
+        return client;
     }
 }
index 929b658e7aad0fadf8aade9a30fb7bb1ab3df45d..73f0004939c223fde1c867a081da21ee427f8a86 100644 (file)
@@ -71,6 +71,7 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
             new ConcurrentHashMap<>();
     private static final Logger LOG = LoggerFactory.getLogger(OvsdbConnectionManager.class);
     private static final String ENTITY_TYPE = "ovsdb";
+    private static final int DB_FETCH_TIMEOUT = 1000;
 
     private DataBroker db;
     private TransactionInvoker txInvoker;
@@ -104,7 +105,7 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
                 externalClient.getConnectionInfo().getLocalPort());
         List<String> databases = new ArrayList<>();
         try {
-            databases = externalClient.getDatabases().get(1000, TimeUnit.MILLISECONDS);
+            databases = externalClient.getDatabases().get(DB_FETCH_TIMEOUT, TimeUnit.MILLISECONDS);
             if (databases.contains(SouthboundConstants.OPEN_V_SWITCH)) {
                 OvsdbConnectionInstance client = connectedButCallBacksNotRegistered(externalClient);
                 // Register Cluster Ownership for ConnectionInfo
@@ -246,7 +247,7 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
             ovsdbDeviceEntityOwnershipListener.close();
         }
 
-        for (OvsdbClient client: clients.values()) {
+        for (OvsdbConnectionInstance client: clients.values()) {
             client.disconnect();
         }
     }
@@ -324,15 +325,15 @@ public class OvsdbConnectionManager implements OvsdbConnectionListener, AutoClos
     }
 
     public OvsdbClient getClient(ConnectionInfo connectionInfo) {
-        return getConnectionInstance(connectionInfo);
+        return getConnectionInstance(connectionInfo).getOvsdbClient();
     }
 
     public OvsdbClient getClient(OvsdbBridgeAttributes mn) {
-        return getConnectionInstance(mn);
+        return getConnectionInstance(mn).getOvsdbClient();
     }
 
     public OvsdbClient getClient(Node node) {
-        return getConnectionInstance(node);
+        return getConnectionInstance(node).getOvsdbClient();
     }
 
     public Boolean getHasDeviceOwnership(ConnectionInfo connectionInfo) {
index 80462a686d9768553070c91c6ec7eccd2a704bb1..95ef4028de444dc844e1f84e3498ebdca05049ab 100644 (file)
@@ -37,7 +37,7 @@ public class TransactInvokerImpl implements TransactInvoker {
     @Override
     public void invoke(TransactCommand command, BridgeOperationalState state,
                        AsyncDataChangeEvent<InstanceIdentifier<?>, DataObject> events) {
-        TransactionBuilder tb = new TransactionBuilder(connectionInstance, dbSchema);
+        TransactionBuilder tb = new TransactionBuilder(connectionInstance.getOvsdbClient(), dbSchema);
         command.execute(tb, state, events);
         invoke(command, tb);
     }
@@ -45,7 +45,7 @@ public class TransactInvokerImpl implements TransactInvoker {
     @Override
     public void invoke(TransactCommand command, BridgeOperationalState state,
                        Collection<DataTreeModification<Node>> modifications) {
-        TransactionBuilder tb = new TransactionBuilder(connectionInstance, dbSchema);
+        TransactionBuilder tb = new TransactionBuilder(connectionInstance.getOvsdbClient(), dbSchema);
         command.execute(tb, state, modifications);
         invoke(command, tb);
     }
index 16d2abde0bc84f29a24a6462c6e92c31d2655fa5..62e87e54fd3eb5cbd258911029d544877c956c02 100644 (file)
@@ -268,17 +268,6 @@ public class OvsdbConnectionInstanceTest {
                 ovsdbConnectionInstance.unLock(anyString()));
         verify(client).unLock(anyString());
 
-        // test startEchoService()
-        EchoServiceCallbackFilters echoServiceCallbackFilters = mock(EchoServiceCallbackFilters.class);
-        doNothing().when(client).startEchoService(any(EchoServiceCallbackFilters.class));
-        ovsdbConnectionInstance.startEchoService(echoServiceCallbackFilters);
-        verify(client).startEchoService(any(EchoServiceCallbackFilters.class));
-
-        // test stopEchoService()
-        doNothing().when(client).stopEchoService();
-        ovsdbConnectionInstance.stopEchoService();
-        verify(client).stopEchoService();
-
         // test isActive()
         when(client.isActive()).thenReturn(true);
         assertEquals("Error, does not match isActive()", true, ovsdbConnectionInstance.isActive());
index 9a67daa54c23e15a470b3f7dd2541bcd38f0c4b8..c466c22743e119ef3d18c3b1a0042cddd91047ff 100644 (file)
@@ -206,7 +206,7 @@ public class OvsdbConnectionManagerTest {
         suppress(MemberMatcher.method(OvsdbConnectionManager.class, "unregisterEntityForOwnership",
                 OvsdbConnectionInstance.class));
         ovsdbConnManager.disconnect(ovsdbNode);
-        verify((OvsdbClient)ovsdbConnectionInstance).disconnect();
+        verify(ovsdbConnectionInstance).disconnect();
     }
 
     @Test
@@ -289,26 +289,28 @@ public class OvsdbConnectionManagerTest {
 
     @Test
     public void testGetClient() {
-        OvsdbClient ovsdbClient = mock(OvsdbConnectionInstance.class);
+        OvsdbConnectionInstance ovsdbClient = mock(OvsdbConnectionInstance.class);
+        OvsdbClient client = mock(OvsdbClient.class);
+        when(ovsdbClient.getOvsdbClient()).thenReturn(client);
 
         //Test getClient(ConnectionInfo connectionInfo)
         ConnectionInfo key = mock(ConnectionInfo.class);
         suppress(MemberMatcher.method(OvsdbConnectionManager.class, "getConnectionInstance", ConnectionInfo.class));
         when(ovsdbConnManager.getConnectionInstance(key)).thenReturn((OvsdbConnectionInstance)ovsdbClient);
-        assertEquals("Error getting correct OvsdbClient object", ovsdbClient, ovsdbConnManager.getClient(key));
+        assertEquals("Error getting correct OvsdbClient object", ovsdbClient.getOvsdbClient(), ovsdbConnManager.getClient(key));
 
         //Test getClient(OvsdbBridgeAttributes mn)
         OvsdbBridgeAttributes mn = mock(OvsdbBridgeAttributes.class);
         suppress(MemberMatcher.method(OvsdbConnectionManager.class, "getConnectionInstance",
                 OvsdbBridgeAttributes.class));
         when(ovsdbConnManager.getConnectionInstance(mn)).thenReturn((OvsdbConnectionInstance)ovsdbClient);
-        assertEquals("Error getting correct OvsdbClient object", ovsdbClient, ovsdbConnManager.getClient(mn));
+        assertEquals("Error getting correct OvsdbClient object", ovsdbClient.getOvsdbClient(), ovsdbConnManager.getClient(mn));
 
         //Test getClient(Node node)
         Node node = mock(Node.class);
         suppress(MemberMatcher.method(OvsdbConnectionManager.class, "getConnectionInstance", Node.class));
         when(ovsdbConnManager.getConnectionInstance(node)).thenReturn((OvsdbConnectionInstance)ovsdbClient);
-        assertEquals("Error getting correct OvsdbClient object", ovsdbClient, ovsdbConnManager.getClient(node));
+        assertEquals("Error getting correct OvsdbClient object", ovsdbClient.getOvsdbClient(), ovsdbConnManager.getClient(node));
     }
 
     @SuppressWarnings("unchecked")