Migrating ovsdb_plugin's connection service to the newly redesigned Schema independen... 30/8430/2
authorMadhu Venugopal <mavenugo@gmail.com>
Fri, 27 Jun 2014 23:23:31 +0000 (16:23 -0700)
committerMadhu Venugopal <mavenugo@gmail.com>
Fri, 27 Jun 2014 23:43:09 +0000 (16:43 -0700)
This is just step-1. More to follow soon.

Change-Id: Ifdff0f11ea7cabaac93b67a822ac88120f023619
Signed-off-by: Madhu Venugopal <mavenugo@gmail.com>
16 files changed:
integrationtest/pom.xml
integrationtest/src/test/java/org/opendaylight/ovsdb/integrationtest/OvsdbIntegrationTestBase.java
integrationtest/src/test/java/org/opendaylight/ovsdb/integrationtest/plugin/OvsdbPluginIT.java
library/pom.xml
library/src/main/java/org/opendaylight/ovsdb/lib/OvsdbClient.java
library/src/main/java/org/opendaylight/ovsdb/lib/OvsdbConnectionInfo.java
library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbClientImpl.java
library/src/main/java/org/opendaylight/ovsdb/lib/impl/OvsdbConnectionService.java
plugin/pom.xml
plugin/src/main/java/org/opendaylight/ovsdb/plugin/Activator.java
plugin/src/main/java/org/opendaylight/ovsdb/plugin/ConfigurationService.java
plugin/src/main/java/org/opendaylight/ovsdb/plugin/Connection.java
plugin/src/main/java/org/opendaylight/ovsdb/plugin/ConnectionService.java
plugin/src/main/java/org/opendaylight/ovsdb/plugin/InventoryService.java
plugin/src/main/java/org/opendaylight/ovsdb/plugin/InventoryServiceInternal.java
plugin/src/main/java/org/opendaylight/ovsdb/plugin/NodeDB.java

index 11597cefff7d443ff6d1cc74d69878ba435ff582..724cfff111cb1fb22583ba58e7c86fb796339c15 100644 (file)
               <!--  TODO remove this includes section once the plugin migration is complete -->
               <includes>
                 <include>**/*LibraryIT*</include>
+                <include>**/*PluginIT*</include>
               </includes>
             </configuration>
           </execution>
index 9f15548ed690cabf744ac796847ab9e4402fbcb2..76f7647916d65f538e3ca856b54971c80e16f9fd 100644 (file)
@@ -29,14 +29,14 @@ import org.opendaylight.ovsdb.lib.OvsdbConnection;
 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
 
 public abstract class OvsdbIntegrationTestBase {
-    private final static String identifier = "TEST";
-    private final static String SERVER_IPADDRESS = "ovsdbserver.ipaddress";
-    private final static String SERVER_PORT = "ovsdbserver.port";
-    private final static String CONNECTION_TYPE = "ovsdbserver.connection";
-    private final static String CONNECTION_TYPE_ACTIVE = "active";
-    private final static String CONNECTION_TYPE_PASSIVE = "passive";
+    protected final static String IDENTIFIER = "TEST";
+    protected final static String SERVER_IPADDRESS = "ovsdbserver.ipaddress";
+    protected final static String SERVER_PORT = "ovsdbserver.port";
+    protected final static String CONNECTION_TYPE = "ovsdbserver.connection";
+    protected final static String CONNECTION_TYPE_ACTIVE = "active";
+    protected final static String CONNECTION_TYPE_PASSIVE = "passive";
 
-    private final static String DEFAULT_SERVER_PORT = "6640";
+    protected final static String DEFAULT_SERVER_PORT = "6640";
 
     /**
      * Represents the Open Vswitch Schema
@@ -90,7 +90,7 @@ public abstract class OvsdbIntegrationTestBase {
         return null;
     }
 
-    private String usage() {
+    protected String usage() {
         return "Integration Test needs a valid connection configuration as follows :\n" +
                "active connection : mvn -Pintegrationtest -Dovsdbserver.ipaddress=x.x.x.x -Dovsdbserver.port=yyyy verify\n"+
                "passive connection : mvn -Pintegrationtest -Dovsdbserver.connection=passive verify\n";
index fae3bd43af99df7b140119dd99222cb537096ec4..837882781276bb5ab3a3b828484458d896ebb201 100644 (file)
@@ -9,28 +9,37 @@
  */
 package org.opendaylight.ovsdb.integrationtest.plugin;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeNotNull;
 import static org.ops4j.pax.exam.CoreOptions.junitBundles;
 import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
 import static org.ops4j.pax.exam.CoreOptions.options;
 import static org.ops4j.pax.exam.CoreOptions.propagateSystemProperty;
 import static org.ops4j.pax.exam.CoreOptions.systemProperty;
 
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 import javax.inject.Inject;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.opendaylight.controller.sal.connection.ConnectionConstants;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.utils.ServiceHelper;
 import org.opendaylight.ovsdb.integrationtest.ConfigurationBundles;
 import org.opendaylight.ovsdb.integrationtest.OvsdbIntegrationTestBase;
 import org.opendaylight.ovsdb.lib.OvsdbClient;
+import org.opendaylight.ovsdb.plugin.IConnectionServiceInternal;
 import org.opendaylight.ovsdb.plugin.OVSDBConfigService;
 import org.ops4j.pax.exam.Configuration;
 import org.ops4j.pax.exam.Option;
@@ -87,6 +96,29 @@ public class OvsdbPluginIT extends OvsdbIntegrationTestBase {
         }
     }
 
+    public Node getPluginTestConnection() throws IOException, InterruptedException, ExecutionException, TimeoutException {
+        Properties props = loadProperties();
+        String addressStr = props.getProperty(SERVER_IPADDRESS);
+        String portStr = props.getProperty(SERVER_PORT, DEFAULT_SERVER_PORT);
+        String connectionType = props.getProperty(CONNECTION_TYPE, "active");
+
+        // If the connection type is active, controller connects to the ovsdb-server
+        if (connectionType.equalsIgnoreCase(CONNECTION_TYPE_ACTIVE)) {
+            if (addressStr == null) {
+                fail(usage());
+            }
+
+            Map<ConnectionConstants, String> params = new HashMap<ConnectionConstants, String>();
+            params.put(ConnectionConstants.ADDRESS, addressStr);
+            params.put(ConnectionConstants.PORT, portStr);
+
+            IConnectionServiceInternal connection = (IConnectionServiceInternal)ServiceHelper.getGlobalInstance(IConnectionServiceInternal.class, this);
+            return connection.connect(IDENTIFIER, params);
+        }
+        fail("Connection parameter ("+CONNECTION_TYPE+") must be active");
+        return null;
+    }
+
     @Before
     public void areWeReady() throws InterruptedException {
         assertNotNull(bc);
@@ -104,10 +136,9 @@ public class OvsdbPluginIT extends OvsdbIntegrationTestBase {
             log.debug("Do some debugging because some bundle is unresolved");
         }
 
-        // Assert if true, if false we are good to go!
         assertFalse(debugit);
         try {
-            client = getTestConnection();
+            node = getPluginTestConnection();
         } catch (Exception e) {
             fail("Exception : "+e.getMessage());
         }
@@ -116,12 +147,16 @@ public class OvsdbPluginIT extends OvsdbIntegrationTestBase {
 
     @Test
     public void tableTest() throws Exception {
-        assertNotNull("Invalid Node. Check connection params", client);
-        Thread.sleep(3000); // Wait for a few seconds to get the Schema exchange done
-        /*
-         * TODO : Remove the assumeNotNull once the Plugin is migrated to the new lib
-         */
-        assumeNotNull(node);
+        Thread.sleep(5000);
+        IConnectionServiceInternal connection = (IConnectionServiceInternal)ServiceHelper.getGlobalInstance(IConnectionServiceInternal.class, this);
+
+        // Check for the ovsdb Connection as seen by the Plugin layer
+        assertNotNull(connection.getNodes());
+        assertTrue(connection.getNodes().size() > 0);
+        assertEquals(Node.fromString("OVS|"+IDENTIFIER), connection.getNodes().get(0));
+
+        System.out.println("Nodes = "+connection.getNodes());
+
         List<String> tables = ovsdbConfigService.getTables(node);
         System.out.println("Tables = "+tables);
         assertNotNull(tables);
index f69ca33ff97ee8dde7c67b7b97cb9d404d632f41..2a6863d6692a0837b3dfa49fcdb4572dbdf74f02 100755 (executable)
                 org.opendaylight.ovsdb.lib.database,
                 org.opendaylight.ovsdb.lib.operations,
                 org.opendaylight.ovsdb.lib.message,
+
+                <!-- TODO : Remove the following temp package switfly once the plugin migration is complete -->
+                org.opendaylight.ovsdb.lib.message.temp,
                 org.opendaylight.ovsdb.lib.schema,
                 org.opendaylight.ovsdb.lib.schema.typed</Export-Package>
           </instructions>
index 80e572a7473e76ef1684cad3d6f4f731ff49ccff..d6a9831f0e0ca8aa1cb7b5464a32100401447629 100644 (file)
@@ -113,6 +113,10 @@ public interface OvsdbClient {
 
     public OvsdbConnectionInfo getConnectionInfo();
 
+    public boolean isActive();
+
+    public void disconnect();
+
     public DatabaseSchema getDatabaseSchema (String dbName);
 
     /**
index 97133d80c704b4fd00077083120ffc231303081f..9b82c690c428ebf8af992c2e9c740d88ff72ee2d 100644 (file)
 
 package org.opendaylight.ovsdb.lib;
 
+import io.netty.channel.Channel;
+
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 public class OvsdbConnectionInfo {
     public enum ConnectionType {
         ACTIVE, PASSIVE
     }
 
-    InetAddress address;
-    int port;
-    ConnectionType type;
+    private Channel channel;
+    private ConnectionType type;
 
-    public OvsdbConnectionInfo(InetAddress address, int port, ConnectionType type) {
-        this.address = address;
-        this.port = port;
+    public OvsdbConnectionInfo(Channel channel, ConnectionType type) {
+        this.channel = channel;
         this.type = type;
     }
 
-    public InetAddress getAddress() {
-        return address;
+    public InetAddress getRemoteAddress() {
+        return ((InetSocketAddress)channel.remoteAddress()).getAddress();
+    }
+    public int getRemotePort() {
+        return ((InetSocketAddress)channel.remoteAddress()).getPort();
+    }
+
+    public InetAddress getLocalAddress() {
+        return ((InetSocketAddress)channel.localAddress()).getAddress();
     }
-    public int getPort() {
-        return port;
+    public int getLocalPort() {
+        return ((InetSocketAddress)channel.localAddress()).getPort();
     }
+
     public ConnectionType getType() {
         return type;
     }
@@ -40,9 +49,10 @@ public class OvsdbConnectionInfo {
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + ((address == null) ? 0 : address.hashCode());
-        result = prime * result + port;
-        result = prime * result + ((type == null) ? 0 : type.hashCode());
+        result = prime * result + ((channel == null) ? 0 : getRemoteAddress().hashCode());
+        result = prime * result + ((type == null) ? 0 : getRemotePort());
+        result = prime * result + ((channel == null) ? 0 : getLocalAddress().hashCode());
+        result = prime * result + ((type == null) ? 0 : getLocalPort());
         return result;
     }
 
@@ -55,20 +65,29 @@ public class OvsdbConnectionInfo {
         if (getClass() != obj.getClass())
             return false;
         OvsdbConnectionInfo other = (OvsdbConnectionInfo) obj;
-        if (address == null) {
-            if (other.address != null)
+        if (channel == null) {
+            if (other.channel != null)
                 return false;
-        } else if (!address.equals(other.address))
+        } else if (!getRemoteAddress().equals(other.getRemoteAddress())) {
+            return false;
+        } else if (!getLocalAddress().equals(other.getLocalAddress())) {
             return false;
-        if (port != other.port)
+        } else if (getRemotePort() != other.getRemotePort()) {
             return false;
+        } else if (getLocalPort() != other.getLocalPort()) {
+            return false;
+        }
         if (type != other.type)
             return false;
         return true;
     }
+
     @Override
     public String toString() {
-        return "ConnectionInfo [address=" + address + ", port=" + port
-                + ", type=" + type + "]";
+        return "ConnectionInfo [Remote-address=" + this.getRemoteAddress().getHostAddress() +
+                             ", Remote-port=" + this.getRemotePort() +
+                             ", Local-address" + this.getLocalAddress().getHostAddress() +
+                             ", Local-port=" + this.getLocalPort() +
+                             ", type=" + type + "]";
     }
 }
index b2fde021da533615281556765b96921e9c507f31..59c12192205005536ee661f34134f956551a2bf7 100644 (file)
@@ -9,6 +9,8 @@
  */
 package org.opendaylight.ovsdb.lib.impl;
 
+import io.netty.channel.Channel;
+
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -24,6 +26,7 @@ import org.opendaylight.ovsdb.lib.MonitorCallBack;
 import org.opendaylight.ovsdb.lib.MonitorHandle;
 import org.opendaylight.ovsdb.lib.OvsdbClient;
 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
+import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
 import org.opendaylight.ovsdb.lib.jsonrpc.Params;
 import org.opendaylight.ovsdb.lib.message.MonitorRequest;
 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
@@ -65,11 +68,14 @@ public class OvsdbClientImpl implements OvsdbClient {
     private Queue<Throwable> exceptions;
     private OvsdbRPC.Callback rpcCallback;
     private OvsdbConnectionInfo connectionInfo;
+    private Channel channel;
 
-    public OvsdbClientImpl(OvsdbRPC rpc, OvsdbConnectionInfo connectionInfo, ExecutorService executorService) {
+    public OvsdbClientImpl(OvsdbRPC rpc, Channel channel, ConnectionType type, ExecutorService executorService) {
         this.rpc = rpc;
         this.executorService = executorService;
-        this.connectionInfo = connectionInfo;
+        this.channel = channel;
+
+        this.connectionInfo = new OvsdbConnectionInfo(channel, type);
     }
 
     OvsdbClientImpl() {
@@ -377,4 +383,14 @@ public class OvsdbClientImpl implements OvsdbClient {
     public OvsdbConnectionInfo getConnectionInfo() {
         return connectionInfo;
     }
+
+    @Override
+    public boolean isActive() {
+        return channel.isActive();
+    }
+
+    @Override
+    public void disconnect() {
+        channel.disconnect();
+    }
 }
index 93bfae95319126bb08d447334d03f52ac672d38e..be854bea0f635f5c2a9ca6244144e25218112df2 100644 (file)
@@ -28,7 +28,6 @@ import io.netty.handler.logging.LoggingHandler;
 import io.netty.util.CharsetUtil;
 
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -36,7 +35,6 @@ import java.util.concurrent.Executors;
 
 import org.opendaylight.ovsdb.lib.OvsdbClient;
 import org.opendaylight.ovsdb.lib.OvsdbConnection;
-import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
 import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
 import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
 import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
@@ -142,12 +140,7 @@ public class OvsdbConnectionService implements OvsdbConnection {
         channel.pipeline().addLast(binderHandler);
 
         OvsdbRPC rpc = factory.getClient(channel, OvsdbRPC.class);
-        InetSocketAddress channelAddress = (InetSocketAddress)channel.remoteAddress();
-        InetAddress address = channelAddress.getAddress();
-        int port = channelAddress.getPort();
-
-        OvsdbConnectionInfo info = new OvsdbConnectionInfo(address, port, type);
-        OvsdbClientImpl client = new OvsdbClientImpl(rpc, info, executorService);
+        OvsdbClientImpl client = new OvsdbClientImpl(rpc, channel, type, executorService);
         connections.put(client, channel);
         ChannelFuture closeFuture = channel.closeFuture();
         closeFuture.addListener(new ChannelConnectionHandler(client));
index 623afc06b3fb4ae3b75cbdbf359a4f7998a89807..948d94b647c5ec1535447d16f509cd155d5a565e 100755 (executable)
       <id>integrationtest</id>
       <activation></activation>
       <properties>
-        <skip.integrationtest>false</skip.integrationtest>
+        <skip.integrationtest>true</skip.integrationtest>
       </properties>
     </profile>
   </profiles>
index 5df971af70ef8174182293d688c8c44abf22e4f3..65a2a48819da3dd964b2c3d24c21c65b0b1dcc8f 100644 (file)
@@ -24,6 +24,8 @@ import org.opendaylight.controller.sal.networkconfig.bridgedomain.IPluginInBridg
 import org.opendaylight.controller.sal.utils.GlobalConstants;
 import org.opendaylight.controller.sal.utils.INodeConnectorFactory;
 import org.opendaylight.controller.sal.utils.INodeFactory;
+import org.opendaylight.ovsdb.lib.OvsdbConnection;
+import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,7 +95,8 @@ public class Activator extends ComponentActivatorAbstractBase {
             props.put(GlobalConstants.PROTOCOLPLUGINTYPE.toString(), "OVS");
             c.setInterface(
                     new String[] {IPluginInConnectionService.class.getName(),
-                                  IConnectionServiceInternal.class.getName()}, props);
+                                  IConnectionServiceInternal.class.getName(),
+                                  OvsdbConnectionListener.class.getName()}, props);
             c.add(createServiceDependency()
                     .setService(InventoryServiceInternal.class)
                     .setCallbacks("setInventoryServiceInternal", "unsetInventoryServiceInternal")
@@ -102,6 +105,10 @@ public class Activator extends ComponentActivatorAbstractBase {
                     .setService(IClusterGlobalServices.class)
                     .setCallbacks("setClusterServices", "unsetClusterServices")
                     .setRequired(false));
+            c.add(createServiceDependency()
+                    .setService(OvsdbConnection.class)
+                    .setCallbacks("setOvsdbConnection", "unsetOvsdbConnection")
+                    .setRequired(true));
         }
 
         if (imp.equals(InventoryService.class)) {
index 85b795ebae78dfe32c37c4707c157671f583db9b..5d8e6c6619ebd00242abbd43a17d0d77fabb0cdf 100644 (file)
@@ -32,12 +32,6 @@ import org.opendaylight.controller.sal.utils.StatusCode;
 import org.opendaylight.ovsdb.lib.database.OVSInstance;
 import org.opendaylight.ovsdb.lib.database.OvsdbType;
 import org.opendaylight.ovsdb.lib.message.TransactBuilder;
-import org.opendaylight.ovsdb.lib.operations.DeleteOperation;
-import org.opendaylight.ovsdb.lib.operations.InsertOperation;
-import org.opendaylight.ovsdb.lib.operations.MutateOperation;
-import org.opendaylight.ovsdb.lib.operations.Operation;
-import org.opendaylight.ovsdb.lib.operations.OperationResult;
-import org.opendaylight.ovsdb.lib.operations.UpdateOperation;
 import org.opendaylight.ovsdb.lib.notation.Condition;
 import org.opendaylight.ovsdb.lib.notation.Function;
 import org.opendaylight.ovsdb.lib.notation.Mutation;
@@ -45,6 +39,12 @@ import org.opendaylight.ovsdb.lib.notation.Mutator;
 import org.opendaylight.ovsdb.lib.notation.OvsDBMap;
 import org.opendaylight.ovsdb.lib.notation.OvsDBSet;
 import org.opendaylight.ovsdb.lib.notation.UUID;
+import org.opendaylight.ovsdb.lib.operations.DeleteOperation;
+import org.opendaylight.ovsdb.lib.operations.InsertOperation;
+import org.opendaylight.ovsdb.lib.operations.MutateOperation;
+import org.opendaylight.ovsdb.lib.operations.Operation;
+import org.opendaylight.ovsdb.lib.operations.OperationResult;
+import org.opendaylight.ovsdb.lib.operations.UpdateOperation;
 import org.opendaylight.ovsdb.lib.table.Bridge;
 import org.opendaylight.ovsdb.lib.table.Controller;
 import org.opendaylight.ovsdb.lib.table.IPFIX;
@@ -136,7 +136,7 @@ public class ConfigurationService implements IPluginInBridgeDomainConfigService,
 
     private Connection getConnection (Node node) {
         Connection connection = connectionService.getConnection(node);
-        if (connection == null || !connection.getChannel().isActive()) {
+        if (connection == null || !connection.getClient().isActive()) {
             return null;
         }
 
@@ -234,7 +234,7 @@ public class ConfigurationService implements IPluginInBridgeDomainConfigService,
                                                     addBridgeRequest,
                                                     updateCfgVerRequest)));
 
-            ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+            ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
             List<OperationResult> tr = transResponse.get();
             List<Operation> requests = transaction.getRequests();
             Status status = new Status(StatusCode.SUCCESS);
@@ -360,7 +360,7 @@ public class ConfigurationService implements IPluginInBridgeDomainConfigService,
                     transaction.addOperations(new ArrayList<Operation>
                             (Arrays.asList(addBrMutRequest, addPortRequest, addIntfRequest)));
 
-                    ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+                    ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
                     List<OperationResult> tr = transResponse.get();
                     List<Operation> requests = transaction.getRequests();
                     Status status = new Status(StatusCode.SUCCESS);
@@ -523,7 +523,7 @@ public class ConfigurationService implements IPluginInBridgeDomainConfigService,
             TransactBuilder transaction = new TransactBuilder();
             transaction.addOperations(new ArrayList<Operation>(Arrays.asList(delPortRequest)));
 
-            ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+            ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
             List<OperationResult> tr = transResponse.get();
             List<Operation> requests = transaction.getRequests();
             Status status = new Status(StatusCode.SUCCESS);
@@ -619,7 +619,7 @@ public class ConfigurationService implements IPluginInBridgeDomainConfigService,
             TransactBuilder transaction = new TransactBuilder();
             transaction.addOperations(new ArrayList<Operation>(Arrays.asList(delBrRequest)));
 
-            ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+            ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
             List<OperationResult> tr = transResponse.get();
             List<Operation> requests = transaction.getRequests();
             Status status = new Status(StatusCode.SUCCESS);
@@ -770,7 +770,7 @@ public class ConfigurationService implements IPluginInBridgeDomainConfigService,
             transaction.addOperations(new ArrayList<Operation>(
                                       Arrays.asList(updateRequest)));
 
-            ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+            ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
             List<OperationResult> tr = transResponse.get();
             List<Operation> requests = transaction.getRequests();
             Status status = new Status(StatusCode.SUCCESS);
@@ -1486,7 +1486,7 @@ public class ConfigurationService implements IPluginInBridgeDomainConfigService,
                 return new StatusWithUuid(StatusCode.NOSERVICE, "Connection to ovsdb-server not available");
             }
 
-            ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+            ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
             List<OperationResult> tr = transResponse.get();
             List<Operation> requests = transaction.getRequests();
             StatusWithUuid status = new StatusWithUuid(StatusCode.SUCCESS);
@@ -1688,7 +1688,7 @@ public class ConfigurationService implements IPluginInBridgeDomainConfigService,
             transaction.addOperations(new ArrayList<Operation>(Arrays.asList(delRequest)));
 
             // This executes the transaction.
-            ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+            ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
 
             // Pull the responses
             List<OperationResult> tr = transResponse.get();
@@ -1752,7 +1752,7 @@ public class ConfigurationService implements IPluginInBridgeDomainConfigService,
             transaction.addOperations(new ArrayList<Operation>(Arrays.asList(delRequest)));
 
             // This executes the transaction.
-            ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+            ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
 
             // Pull the responses
             List<OperationResult> tr = transResponse.get();
index 6c551d76a62e05942a9441c0e2c636bc8eebd23f..4088af1b8b1a710711b617dbd340f8b98141ecdb 100644 (file)
@@ -9,21 +9,18 @@
  */
 package org.opendaylight.ovsdb.plugin;
 
-import io.netty.channel.Channel;
-
 import org.opendaylight.controller.sal.core.ConstructionException;
 import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
-import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
+import org.opendaylight.ovsdb.lib.OvsdbClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class Connection {
     private Node node;
     private String identifier;
-    private Channel channel;
-    private OvsdbRPC rpc;
+    private OvsdbClient client;
 
     public Long getIdCounter() {
         return idCounter;
@@ -37,12 +34,12 @@ public class Connection {
 
     private static final Logger logger = LoggerFactory.getLogger(Connection.class);
 
-    public Connection(String identifier, Channel channel) {
+    public Connection(String identifier, OvsdbClient client) {
 
         super();
 
         this.identifier = identifier;
-        this.channel = channel;
+        this.client = client;
         this.idCounter = 0L;
         try {
             node = new Node("OVS", identifier);
@@ -59,12 +56,12 @@ public class Connection {
         this.identifier = identifier;
     }
 
-    public Channel getChannel() {
-        return this.channel;
+    public OvsdbClient getClient() {
+        return this.client;
     }
 
-    public void setChannel(Channel channel) {
-        this.channel = channel;
+    public void setClient(OvsdbClient client) {
+        this.client = client;
     }
 
     public Node getNode() {
@@ -75,21 +72,7 @@ public class Connection {
         this.node = node;
     }
 
-    public OvsdbRPC getRpc() {
-        return rpc;
-    }
-
-    public void setRpc(OvsdbRPC rpc) {
-        this.rpc = rpc;
-    }
-
-    public void sendMessage(String message) {
-        channel.writeAndFlush(message);
-        this.idCounter++;
-    }
-
     public Status disconnect() {
-        channel.close();
         return new Status(StatusCode.SUCCESS);
     }
 
index 534a94605cdcb393544991a12c664a83fd019616..55f8717047e89c61dc8eeaf418a27565be1a8839 100644 (file)
@@ -9,26 +9,10 @@
  */
 package org.opendaylight.ovsdb.plugin;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.string.StringEncoder;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-import io.netty.util.CharsetUtil;
 
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -48,26 +32,21 @@ import org.opendaylight.controller.sal.core.Property;
 import org.opendaylight.controller.sal.utils.ServiceHelper;
 import org.opendaylight.controller.sal.utils.Status;
 import org.opendaylight.controller.sal.utils.StatusCode;
-import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
-import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
-import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
-import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
-import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
+import org.opendaylight.ovsdb.lib.OvsdbClient;
+import org.opendaylight.ovsdb.lib.OvsdbConnection;
+import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
+import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
 import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
-import org.opendaylight.ovsdb.lib.message.temp.TableUpdates;
 import org.opendaylight.ovsdb.lib.message.UpdateNotification;
 import org.opendaylight.ovsdb.lib.notation.OvsDBSet;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
 import org.opendaylight.ovsdb.lib.table.Bridge;
 import org.opendaylight.ovsdb.lib.table.Controller;
 import org.opendaylight.ovsdb.lib.table.Open_vSwitch;
 import org.opendaylight.ovsdb.lib.table.Table;
-import org.opendaylight.ovsdb.lib.table.Tables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.util.concurrent.ListenableFuture;
 
 
@@ -75,7 +54,7 @@ import com.google.common.util.concurrent.ListenableFuture;
  * Represents the openflow plugin component in charge of programming the flows
  * the flow programming and relay them to functional modules above SAL.
  */
-public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
+public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback, OvsdbConnectionListener {
     protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
 
     // Properties that can be set in config.ini
@@ -87,6 +66,7 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
     private static final Integer defaultOvsdbPort = 6640;
     private static final boolean defaultAutoConfigureController = true;
 
+    private OvsdbConnection connectionLib;
     private static Integer ovsdbListenPort = defaultOvsdbPort;
     private static boolean autoConfigureController = defaultAutoConfigureController;
     private ConcurrentMap<String, Connection> ovsdbConnections;
@@ -108,6 +88,14 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
         }
     }
 
+    public void setOvsdbConnection(OvsdbConnection connectionService) {
+        connectionLib = connectionService;
+    }
+
+    public void unsetOvsdbConnection(OvsdbConnection connectionService) {
+        connectionLib = null;
+    }
+
     public void init() {
         ovsdbConnections = new ConcurrentHashMap<String, Connection>();
         int listenPort = defaultOvsdbPort;
@@ -135,7 +123,6 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
      * the services provided by the class are registered in the service registry
      */
     void start() {
-        startOvsdbManager();
     }
 
     /**
@@ -182,31 +169,8 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
         }
 
         try {
-            Bootstrap bootstrap = new Bootstrap();
-            bootstrap.group(new NioEventLoopGroup());
-            bootstrap.channel(NioSocketChannel.class);
-            bootstrap.option(ChannelOption.TCP_NODELAY, true);
-            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
-
-            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
-                @Override
-                public void initChannel(SocketChannel channel) throws Exception {
-                    if (handlers == null) {
-                        channel.pipeline().addLast(
-                                //new LoggingHandler(LogLevel.INFO),
-                                new JsonRpcDecoder(100000),
-                                new StringEncoder(CharsetUtil.UTF_8));
-                    } else {
-                        for (ChannelHandler handler : handlers) {
-                            channel.pipeline().addLast(handler);
-                        }
-                    }
-                }
-            });
-
-            ChannelFuture future = bootstrap.connect(address, port).sync();
-            Channel channel = future.channel();
-            return handleNewConnection(identifier, channel, this);
+            OvsdbClient client = connectionLib.connect(address, port);
+            return handleNewConnection(identifier, client, this);
         } catch (InterruptedException e) {
             logger.error("Thread was interrupted during connect", e);
         } catch (ExecutionException e) {
@@ -246,29 +210,10 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
     public void notifyNodeDisconnectFromMaster(Node arg0) {
     }
 
-    private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
-        Connection connection = new Connection(identifier, channel);
+    private Node handleNewConnection(String identifier, OvsdbClient client, ConnectionService instance) throws InterruptedException, ExecutionException {
+        Connection connection = new Connection(identifier, client);
         Node node = connection.getNode();
-
-        ObjectMapper objectMapper = new ObjectMapper();
-        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-        objectMapper.setSerializationInclusion(Include.NON_NULL);
-
-        JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
-        JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
-        binderHandler.setContext(node);
-        channel.pipeline().addLast(binderHandler);
-
-        OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
-        connection.setRpc(ovsdb);
-        ovsdb.registerCallback(instance);
         ovsdbConnections.put(identifier, connection);
-
-        ChannelConnectionHandler handler = new ChannelConnectionHandler();
-        handler.setNode(node);
-        handler.setConnectionService(this);
-        ChannelFuture closeFuture = channel.closeFuture();
-        closeFuture.addListener(handler);
         // Keeping the Initial inventory update(s) on its own thread.
         new Thread() {
             Connection connection;
@@ -299,9 +244,9 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
     }
 
     private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
-        Channel channel = connection.getChannel();
-        InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
-        int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
+        OvsdbClient client = connection.getClient();
+        InetAddress address = client.getConnectionInfo().getRemoteAddress();
+        int port = client.getConnectionInfo().getRemotePort();
         IPAddressProperty addressProp = new IPAddressProperty(address);
         L4PortProperty l4Port = new L4PortProperty(port);
         Set<Property> props = new HashSet<Property>();
@@ -310,13 +255,13 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
         inventoryServiceInternal.addNode(connection.getNode(), props);
 
         List<String> dbNames = Arrays.asList(Open_vSwitch.NAME.getName());
-        ListenableFuture<DatabaseSchema> dbSchemaF = null;//TODO : fix it up to new structue : connection.getRpc().get_schema(dbNames);
+        ListenableFuture<DatabaseSchema> dbSchemaF = client.getSchema("Open_vSwitch", true);
         DatabaseSchema databaseSchema = dbSchemaF.get();
         inventoryServiceInternal.updateDatabaseSchema(connection.getNode(), databaseSchema);
-
+/*
         MonitorRequestBuilder monitorReq = null; //ashwin(not sure if we need) : new MonitorRequestBuilder();
         for (Table<?> table : Tables.getTables()) {
-            if (databaseSchema.getTables().keySet().contains(table.getTableName().getName())) {
+            if (databaseSchema.getTables().contains(table.getTableName().getName())) {
                 //ashwin(not sure if we need) monitorReq.monitor(table);
             } else {
                 logger.debug("We know about table {} but it is not in the schema of {}", table.getTableName().getName(), connection.getNode().getNodeIDString());
@@ -329,7 +274,6 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
             logger.error("Error configuring monitor, error : {}, details : {}",
                     updates.getError(),
                     updates.getDetails());
-            /* FIXME: This should be cause for alarm */
             throw new RuntimeException("Failed to setup a monitor in OVSDB");
         }
         UpdateNotification monitor = new UpdateNotification();
@@ -338,59 +282,10 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
         if (autoConfigureController) {
             this.updateOFControllers(connection.getNode());
         }
+        */
         inventoryServiceInternal.notifyNodeAdded(connection.getNode());
     }
 
-    private void startOvsdbManager() {
-        new Thread() {
-            @Override
-            public void run() {
-                ovsdbManager();
-            }
-        }.start();
-    }
-
-    private void ovsdbManager() {
-        EventLoopGroup bossGroup = new NioEventLoopGroup();
-        EventLoopGroup workerGroup = new NioEventLoopGroup();
-        try {
-            ServerBootstrap b = new ServerBootstrap();
-            b.group(bossGroup, workerGroup)
-             .channel(NioServerSocketChannel.class)
-             .option(ChannelOption.SO_BACKLOG, 100)
-             .handler(new LoggingHandler(LogLevel.INFO))
-             .childHandler(new ChannelInitializer<SocketChannel>() {
-                 @Override
-                 public void initChannel(SocketChannel channel) throws Exception {
-                     logger.debug("New Passive channel created : "+ channel.toString());
-                     InetAddress address = channel.remoteAddress().getAddress();
-                     int port = channel.remoteAddress().getPort();
-                     String identifier = address.getHostAddress()+":"+port;
-                     channel.pipeline().addLast(
-                             new LoggingHandler(LogLevel.INFO),
-                             new JsonRpcDecoder(100000),
-                             new StringEncoder(CharsetUtil.UTF_8));
-
-                     Node node = handleNewConnection(identifier, channel, ConnectionService.this);
-                     logger.debug("Connected Node : "+node.toString());
-                 }
-             });
-            b.option(ChannelOption.TCP_NODELAY, true);
-            b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
-            // Start the server.
-            ChannelFuture f = b.bind(ovsdbListenPort).sync();
-            serverListenChannel =  f.channel();
-            // Wait until the server socket is closed.
-            serverListenChannel.closeFuture().sync();
-        } catch (InterruptedException e) {
-            logger.error("Thread interrupted", e);
-        } finally {
-            // Shut down all event loops to terminate all threads.
-            bossGroup.shutdownGracefully();
-            workerGroup.shutdownGracefully();
-        }
-    }
-
     private IClusterGlobalServices clusterServices;
 
     public void setClusterServices(IClusterGlobalServices i) {
@@ -451,7 +346,7 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
         }
 
         try {
-            controllerIP = ((InetSocketAddress)connection.getChannel().localAddress()).getAddress();
+            controllerIP = connection.getClient().getConnectionInfo().getLocalAddress();
             controllers.add(controllerIP);
             return controllers;
         } catch (Exception e) {
@@ -541,4 +436,26 @@ public class ConnectionService implements IPluginInConnectionService, IConnectio
     public void stolen(Object context, List<String> ids) {
         // TODO Auto-generated method stub
     }
+
+    private String getConnectionIdentifier(OvsdbClient client) {
+        OvsdbConnectionInfo info = client.getConnectionInfo();
+        return info.getRemoteAddress().getHostAddress()+":"+info.getRemotePort();
+    }
+
+    @Override
+    public void connected(OvsdbClient client) {
+        logger.info("PLUGIN RECEIVED NOW CONNECTION FROM LIBRARY :  "+ client.getConnectionInfo().toString());
+        String identifier = getConnectionIdentifier(client);
+        try {
+            Node node = handleNewConnection(identifier, client, ConnectionService.this);
+        } catch (InterruptedException | ExecutionException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void disconnected(OvsdbClient client) {
+        logger.info("PLUGIN RECEIVED CONNECTION DISCONNECT FROM LIBRARY :  "+ client.getConnectionInfo().toString());
+    }
 }
index 28ae17b68b4f4b4c3659f471e1dc45ed3cad5a4a..05da8671bf86beb1bb513d70b9a2d9eb0b85e5ad 100644 (file)
@@ -32,11 +32,11 @@ import org.opendaylight.controller.sal.inventory.IPluginInInventoryService;
 import org.opendaylight.controller.sal.inventory.IPluginOutInventoryService;
 import org.opendaylight.controller.sal.utils.HexEncode;
 import org.opendaylight.controller.sal.utils.ServiceHelper;
-import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
 import org.opendaylight.ovsdb.lib.message.temp.TableUpdate;
 import org.opendaylight.ovsdb.lib.message.temp.TableUpdate.Row;
 import org.opendaylight.ovsdb.lib.message.temp.TableUpdates;
 import org.opendaylight.ovsdb.lib.notation.OvsDBSet;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
 import org.opendaylight.ovsdb.lib.table.Bridge;
 import org.opendaylight.ovsdb.lib.table.Table;
 import org.slf4j.Logger;
index c06ac5fad4c049d1356e872d9be5673fb34bdfa7..aad9e3eef830cda1c54ff450f25184fc1cf475a0 100644 (file)
@@ -16,8 +16,8 @@ import org.opendaylight.controller.sal.core.Node;
 import org.opendaylight.controller.sal.core.Property;
 import org.opendaylight.controller.sal.core.UpdateType;
 import org.opendaylight.controller.sal.inventory.IPluginInInventoryService;
-import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
 import org.opendaylight.ovsdb.lib.message.temp.TableUpdates;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
 import org.opendaylight.ovsdb.lib.table.Table;
 
 public interface InventoryServiceInternal extends IPluginInInventoryService {
index b97a6e369dd6492fe6e7c1b864e6b7b7384f54f2..ae868033fb5e0d2111882c2671feb2b229f51367 100644 (file)
@@ -12,12 +12,12 @@ package org.opendaylight.ovsdb.plugin;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
-import com.google.common.collect.Maps;
-
 import org.apache.commons.collections.MapUtils;
-import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
 import org.opendaylight.ovsdb.lib.table.Table;
 
+import com.google.common.collect.Maps;
+
 public class NodeDB {
     private DatabaseSchema schema;
     ConcurrentMap<String, ConcurrentMap<String, Table<?>>> cache = Maps.newConcurrentMap();
@@ -67,7 +67,7 @@ public class NodeDB {
     }
 
     public void printTableCache() {
-        MapUtils.debugPrint(System.out, null, schema.getTables());
+        System.out.println(schema.getTables());
         MapUtils.debugPrint(System.out, null, cache);
     }
 }