This is just step-1. More to follow soon.
Change-Id: Ifdff0f11ea7cabaac93b67a822ac88120f023619
Signed-off-by: Madhu Venugopal <mavenugo@gmail.com>
<!-- TODO remove this includes section once the plugin migration is complete -->
<includes>
<include>**/*LibraryIT*</include>
+ <include>**/*PluginIT*</include>
</includes>
</configuration>
</execution>
import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
public abstract class OvsdbIntegrationTestBase {
- private final static String identifier = "TEST";
- private final static String SERVER_IPADDRESS = "ovsdbserver.ipaddress";
- private final static String SERVER_PORT = "ovsdbserver.port";
- private final static String CONNECTION_TYPE = "ovsdbserver.connection";
- private final static String CONNECTION_TYPE_ACTIVE = "active";
- private final static String CONNECTION_TYPE_PASSIVE = "passive";
+ protected final static String IDENTIFIER = "TEST";
+ protected final static String SERVER_IPADDRESS = "ovsdbserver.ipaddress";
+ protected final static String SERVER_PORT = "ovsdbserver.port";
+ protected final static String CONNECTION_TYPE = "ovsdbserver.connection";
+ protected final static String CONNECTION_TYPE_ACTIVE = "active";
+ protected final static String CONNECTION_TYPE_PASSIVE = "passive";
- private final static String DEFAULT_SERVER_PORT = "6640";
+ protected final static String DEFAULT_SERVER_PORT = "6640";
/**
* Represents the Open Vswitch Schema
return null;
}
- private String usage() {
+ protected String usage() {
return "Integration Test needs a valid connection configuration as follows :\n" +
"active connection : mvn -Pintegrationtest -Dovsdbserver.ipaddress=x.x.x.x -Dovsdbserver.port=yyyy verify\n"+
"passive connection : mvn -Pintegrationtest -Dovsdbserver.connection=passive verify\n";
*/
package org.opendaylight.ovsdb.integrationtest.plugin;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeNotNull;
import static org.ops4j.pax.exam.CoreOptions.junitBundles;
import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
import static org.ops4j.pax.exam.CoreOptions.options;
import static org.ops4j.pax.exam.CoreOptions.propagateSystemProperty;
import static org.ops4j.pax.exam.CoreOptions.systemProperty;
+import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.opendaylight.controller.sal.connection.ConnectionConstants;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.utils.ServiceHelper;
import org.opendaylight.ovsdb.integrationtest.ConfigurationBundles;
import org.opendaylight.ovsdb.integrationtest.OvsdbIntegrationTestBase;
import org.opendaylight.ovsdb.lib.OvsdbClient;
+import org.opendaylight.ovsdb.plugin.IConnectionServiceInternal;
import org.opendaylight.ovsdb.plugin.OVSDBConfigService;
import org.ops4j.pax.exam.Configuration;
import org.ops4j.pax.exam.Option;
}
}
+ public Node getPluginTestConnection() throws IOException, InterruptedException, ExecutionException, TimeoutException {
+ Properties props = loadProperties();
+ String addressStr = props.getProperty(SERVER_IPADDRESS);
+ String portStr = props.getProperty(SERVER_PORT, DEFAULT_SERVER_PORT);
+ String connectionType = props.getProperty(CONNECTION_TYPE, "active");
+
+ // If the connection type is active, controller connects to the ovsdb-server
+ if (connectionType.equalsIgnoreCase(CONNECTION_TYPE_ACTIVE)) {
+ if (addressStr == null) {
+ fail(usage());
+ }
+
+ Map<ConnectionConstants, String> params = new HashMap<ConnectionConstants, String>();
+ params.put(ConnectionConstants.ADDRESS, addressStr);
+ params.put(ConnectionConstants.PORT, portStr);
+
+ IConnectionServiceInternal connection = (IConnectionServiceInternal)ServiceHelper.getGlobalInstance(IConnectionServiceInternal.class, this);
+ return connection.connect(IDENTIFIER, params);
+ }
+ fail("Connection parameter ("+CONNECTION_TYPE+") must be active");
+ return null;
+ }
+
@Before
public void areWeReady() throws InterruptedException {
assertNotNull(bc);
log.debug("Do some debugging because some bundle is unresolved");
}
- // Assert if true, if false we are good to go!
assertFalse(debugit);
try {
- client = getTestConnection();
+ node = getPluginTestConnection();
} catch (Exception e) {
fail("Exception : "+e.getMessage());
}
@Test
public void tableTest() throws Exception {
- assertNotNull("Invalid Node. Check connection params", client);
- Thread.sleep(3000); // Wait for a few seconds to get the Schema exchange done
- /*
- * TODO : Remove the assumeNotNull once the Plugin is migrated to the new lib
- */
- assumeNotNull(node);
+ Thread.sleep(5000);
+ IConnectionServiceInternal connection = (IConnectionServiceInternal)ServiceHelper.getGlobalInstance(IConnectionServiceInternal.class, this);
+
+ // Check for the ovsdb Connection as seen by the Plugin layer
+ assertNotNull(connection.getNodes());
+ assertTrue(connection.getNodes().size() > 0);
+ assertEquals(Node.fromString("OVS|"+IDENTIFIER), connection.getNodes().get(0));
+
+ System.out.println("Nodes = "+connection.getNodes());
+
List<String> tables = ovsdbConfigService.getTables(node);
System.out.println("Tables = "+tables);
assertNotNull(tables);
org.opendaylight.ovsdb.lib.database,
org.opendaylight.ovsdb.lib.operations,
org.opendaylight.ovsdb.lib.message,
+
+ <!-- TODO : Remove the following temp package switfly once the plugin migration is complete -->
+ org.opendaylight.ovsdb.lib.message.temp,
org.opendaylight.ovsdb.lib.schema,
org.opendaylight.ovsdb.lib.schema.typed</Export-Package>
</instructions>
public OvsdbConnectionInfo getConnectionInfo();
+ public boolean isActive();
+
+ public void disconnect();
+
public DatabaseSchema getDatabaseSchema (String dbName);
/**
package org.opendaylight.ovsdb.lib;
+import io.netty.channel.Channel;
+
import java.net.InetAddress;
+import java.net.InetSocketAddress;
public class OvsdbConnectionInfo {
public enum ConnectionType {
ACTIVE, PASSIVE
}
- InetAddress address;
- int port;
- ConnectionType type;
+ private Channel channel;
+ private ConnectionType type;
- public OvsdbConnectionInfo(InetAddress address, int port, ConnectionType type) {
- this.address = address;
- this.port = port;
+ public OvsdbConnectionInfo(Channel channel, ConnectionType type) {
+ this.channel = channel;
this.type = type;
}
- public InetAddress getAddress() {
- return address;
+ public InetAddress getRemoteAddress() {
+ return ((InetSocketAddress)channel.remoteAddress()).getAddress();
+ }
+ public int getRemotePort() {
+ return ((InetSocketAddress)channel.remoteAddress()).getPort();
+ }
+
+ public InetAddress getLocalAddress() {
+ return ((InetSocketAddress)channel.localAddress()).getAddress();
}
- public int getPort() {
- return port;
+ public int getLocalPort() {
+ return ((InetSocketAddress)channel.localAddress()).getPort();
}
+
public ConnectionType getType() {
return type;
}
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((address == null) ? 0 : address.hashCode());
- result = prime * result + port;
- result = prime * result + ((type == null) ? 0 : type.hashCode());
+ result = prime * result + ((channel == null) ? 0 : getRemoteAddress().hashCode());
+ result = prime * result + ((type == null) ? 0 : getRemotePort());
+ result = prime * result + ((channel == null) ? 0 : getLocalAddress().hashCode());
+ result = prime * result + ((type == null) ? 0 : getLocalPort());
return result;
}
if (getClass() != obj.getClass())
return false;
OvsdbConnectionInfo other = (OvsdbConnectionInfo) obj;
- if (address == null) {
- if (other.address != null)
+ if (channel == null) {
+ if (other.channel != null)
return false;
- } else if (!address.equals(other.address))
+ } else if (!getRemoteAddress().equals(other.getRemoteAddress())) {
+ return false;
+ } else if (!getLocalAddress().equals(other.getLocalAddress())) {
return false;
- if (port != other.port)
+ } else if (getRemotePort() != other.getRemotePort()) {
return false;
+ } else if (getLocalPort() != other.getLocalPort()) {
+ return false;
+ }
if (type != other.type)
return false;
return true;
}
+
@Override
public String toString() {
- return "ConnectionInfo [address=" + address + ", port=" + port
- + ", type=" + type + "]";
+ return "ConnectionInfo [Remote-address=" + this.getRemoteAddress().getHostAddress() +
+ ", Remote-port=" + this.getRemotePort() +
+ ", Local-address" + this.getLocalAddress().getHostAddress() +
+ ", Local-port=" + this.getLocalPort() +
+ ", type=" + type + "]";
}
}
*/
package org.opendaylight.ovsdb.lib.impl;
+import io.netty.channel.Channel;
+
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.opendaylight.ovsdb.lib.MonitorHandle;
import org.opendaylight.ovsdb.lib.OvsdbClient;
import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
+import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
import org.opendaylight.ovsdb.lib.jsonrpc.Params;
import org.opendaylight.ovsdb.lib.message.MonitorRequest;
import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
private Queue<Throwable> exceptions;
private OvsdbRPC.Callback rpcCallback;
private OvsdbConnectionInfo connectionInfo;
+ private Channel channel;
- public OvsdbClientImpl(OvsdbRPC rpc, OvsdbConnectionInfo connectionInfo, ExecutorService executorService) {
+ public OvsdbClientImpl(OvsdbRPC rpc, Channel channel, ConnectionType type, ExecutorService executorService) {
this.rpc = rpc;
this.executorService = executorService;
- this.connectionInfo = connectionInfo;
+ this.channel = channel;
+
+ this.connectionInfo = new OvsdbConnectionInfo(channel, type);
}
OvsdbClientImpl() {
public OvsdbConnectionInfo getConnectionInfo() {
return connectionInfo;
}
+
+ @Override
+ public boolean isActive() {
+ return channel.isActive();
+ }
+
+ @Override
+ public void disconnect() {
+ channel.disconnect();
+ }
}
import io.netty.util.CharsetUtil;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import org.opendaylight.ovsdb.lib.OvsdbClient;
import org.opendaylight.ovsdb.lib.OvsdbConnection;
-import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo.ConnectionType;
import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
channel.pipeline().addLast(binderHandler);
OvsdbRPC rpc = factory.getClient(channel, OvsdbRPC.class);
- InetSocketAddress channelAddress = (InetSocketAddress)channel.remoteAddress();
- InetAddress address = channelAddress.getAddress();
- int port = channelAddress.getPort();
-
- OvsdbConnectionInfo info = new OvsdbConnectionInfo(address, port, type);
- OvsdbClientImpl client = new OvsdbClientImpl(rpc, info, executorService);
+ OvsdbClientImpl client = new OvsdbClientImpl(rpc, channel, type, executorService);
connections.put(client, channel);
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelConnectionHandler(client));
<id>integrationtest</id>
<activation></activation>
<properties>
- <skip.integrationtest>false</skip.integrationtest>
+ <skip.integrationtest>true</skip.integrationtest>
</properties>
</profile>
</profiles>
import org.opendaylight.controller.sal.utils.GlobalConstants;
import org.opendaylight.controller.sal.utils.INodeConnectorFactory;
import org.opendaylight.controller.sal.utils.INodeFactory;
+import org.opendaylight.ovsdb.lib.OvsdbConnection;
+import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
props.put(GlobalConstants.PROTOCOLPLUGINTYPE.toString(), "OVS");
c.setInterface(
new String[] {IPluginInConnectionService.class.getName(),
- IConnectionServiceInternal.class.getName()}, props);
+ IConnectionServiceInternal.class.getName(),
+ OvsdbConnectionListener.class.getName()}, props);
c.add(createServiceDependency()
.setService(InventoryServiceInternal.class)
.setCallbacks("setInventoryServiceInternal", "unsetInventoryServiceInternal")
.setService(IClusterGlobalServices.class)
.setCallbacks("setClusterServices", "unsetClusterServices")
.setRequired(false));
+ c.add(createServiceDependency()
+ .setService(OvsdbConnection.class)
+ .setCallbacks("setOvsdbConnection", "unsetOvsdbConnection")
+ .setRequired(true));
}
if (imp.equals(InventoryService.class)) {
import org.opendaylight.ovsdb.lib.database.OVSInstance;
import org.opendaylight.ovsdb.lib.database.OvsdbType;
import org.opendaylight.ovsdb.lib.message.TransactBuilder;
-import org.opendaylight.ovsdb.lib.operations.DeleteOperation;
-import org.opendaylight.ovsdb.lib.operations.InsertOperation;
-import org.opendaylight.ovsdb.lib.operations.MutateOperation;
-import org.opendaylight.ovsdb.lib.operations.Operation;
-import org.opendaylight.ovsdb.lib.operations.OperationResult;
-import org.opendaylight.ovsdb.lib.operations.UpdateOperation;
import org.opendaylight.ovsdb.lib.notation.Condition;
import org.opendaylight.ovsdb.lib.notation.Function;
import org.opendaylight.ovsdb.lib.notation.Mutation;
import org.opendaylight.ovsdb.lib.notation.OvsDBMap;
import org.opendaylight.ovsdb.lib.notation.OvsDBSet;
import org.opendaylight.ovsdb.lib.notation.UUID;
+import org.opendaylight.ovsdb.lib.operations.DeleteOperation;
+import org.opendaylight.ovsdb.lib.operations.InsertOperation;
+import org.opendaylight.ovsdb.lib.operations.MutateOperation;
+import org.opendaylight.ovsdb.lib.operations.Operation;
+import org.opendaylight.ovsdb.lib.operations.OperationResult;
+import org.opendaylight.ovsdb.lib.operations.UpdateOperation;
import org.opendaylight.ovsdb.lib.table.Bridge;
import org.opendaylight.ovsdb.lib.table.Controller;
import org.opendaylight.ovsdb.lib.table.IPFIX;
private Connection getConnection (Node node) {
Connection connection = connectionService.getConnection(node);
- if (connection == null || !connection.getChannel().isActive()) {
+ if (connection == null || !connection.getClient().isActive()) {
return null;
}
addBridgeRequest,
updateCfgVerRequest)));
- ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+ ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
List<OperationResult> tr = transResponse.get();
List<Operation> requests = transaction.getRequests();
Status status = new Status(StatusCode.SUCCESS);
transaction.addOperations(new ArrayList<Operation>
(Arrays.asList(addBrMutRequest, addPortRequest, addIntfRequest)));
- ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+ ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
List<OperationResult> tr = transResponse.get();
List<Operation> requests = transaction.getRequests();
Status status = new Status(StatusCode.SUCCESS);
TransactBuilder transaction = new TransactBuilder();
transaction.addOperations(new ArrayList<Operation>(Arrays.asList(delPortRequest)));
- ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+ ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
List<OperationResult> tr = transResponse.get();
List<Operation> requests = transaction.getRequests();
Status status = new Status(StatusCode.SUCCESS);
TransactBuilder transaction = new TransactBuilder();
transaction.addOperations(new ArrayList<Operation>(Arrays.asList(delBrRequest)));
- ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+ ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
List<OperationResult> tr = transResponse.get();
List<Operation> requests = transaction.getRequests();
Status status = new Status(StatusCode.SUCCESS);
transaction.addOperations(new ArrayList<Operation>(
Arrays.asList(updateRequest)));
- ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+ ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
List<OperationResult> tr = transResponse.get();
List<Operation> requests = transaction.getRequests();
Status status = new Status(StatusCode.SUCCESS);
return new StatusWithUuid(StatusCode.NOSERVICE, "Connection to ovsdb-server not available");
}
- ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+ ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
List<OperationResult> tr = transResponse.get();
List<Operation> requests = transaction.getRequests();
StatusWithUuid status = new StatusWithUuid(StatusCode.SUCCESS);
transaction.addOperations(new ArrayList<Operation>(Arrays.asList(delRequest)));
// This executes the transaction.
- ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+ ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
// Pull the responses
List<OperationResult> tr = transResponse.get();
transaction.addOperations(new ArrayList<Operation>(Arrays.asList(delRequest)));
// This executes the transaction.
- ListenableFuture<List<OperationResult>> transResponse = connection.getRpc().transact(transaction);
+ ListenableFuture<List<OperationResult>> transResponse = connection.getClient().transactBuilder().execute();
// Pull the responses
List<OperationResult> tr = transResponse.get();
*/
package org.opendaylight.ovsdb.plugin;
-import io.netty.channel.Channel;
-
import org.opendaylight.controller.sal.core.ConstructionException;
import org.opendaylight.controller.sal.core.Node;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
-import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
+import org.opendaylight.ovsdb.lib.OvsdbClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Connection {
private Node node;
private String identifier;
- private Channel channel;
- private OvsdbRPC rpc;
+ private OvsdbClient client;
public Long getIdCounter() {
return idCounter;
private static final Logger logger = LoggerFactory.getLogger(Connection.class);
- public Connection(String identifier, Channel channel) {
+ public Connection(String identifier, OvsdbClient client) {
super();
this.identifier = identifier;
- this.channel = channel;
+ this.client = client;
this.idCounter = 0L;
try {
node = new Node("OVS", identifier);
this.identifier = identifier;
}
- public Channel getChannel() {
- return this.channel;
+ public OvsdbClient getClient() {
+ return this.client;
}
- public void setChannel(Channel channel) {
- this.channel = channel;
+ public void setClient(OvsdbClient client) {
+ this.client = client;
}
public Node getNode() {
this.node = node;
}
- public OvsdbRPC getRpc() {
- return rpc;
- }
-
- public void setRpc(OvsdbRPC rpc) {
- this.rpc = rpc;
- }
-
- public void sendMessage(String message) {
- channel.writeAndFlush(message);
- this.idCounter++;
- }
-
public Status disconnect() {
- channel.close();
return new Status(StatusCode.SUCCESS);
}
*/
package org.opendaylight.ovsdb.plugin;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.string.StringEncoder;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-import io.netty.util.CharsetUtil;
import java.net.InetAddress;
-import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import org.opendaylight.controller.sal.utils.ServiceHelper;
import org.opendaylight.controller.sal.utils.Status;
import org.opendaylight.controller.sal.utils.StatusCode;
-import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
-import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcDecoder;
-import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcEndpoint;
-import org.opendaylight.ovsdb.lib.jsonrpc.JsonRpcServiceBinderHandler;
-import org.opendaylight.ovsdb.lib.message.MonitorRequestBuilder;
+import org.opendaylight.ovsdb.lib.OvsdbClient;
+import org.opendaylight.ovsdb.lib.OvsdbConnection;
+import org.opendaylight.ovsdb.lib.OvsdbConnectionInfo;
+import org.opendaylight.ovsdb.lib.OvsdbConnectionListener;
import org.opendaylight.ovsdb.lib.message.OvsdbRPC;
-import org.opendaylight.ovsdb.lib.message.temp.TableUpdates;
import org.opendaylight.ovsdb.lib.message.UpdateNotification;
import org.opendaylight.ovsdb.lib.notation.OvsDBSet;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
import org.opendaylight.ovsdb.lib.table.Bridge;
import org.opendaylight.ovsdb.lib.table.Controller;
import org.opendaylight.ovsdb.lib.table.Open_vSwitch;
import org.opendaylight.ovsdb.lib.table.Table;
-import org.opendaylight.ovsdb.lib.table.Tables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ListenableFuture;
* Represents the openflow plugin component in charge of programming the flows
* the flow programming and relay them to functional modules above SAL.
*/
-public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback {
+public class ConnectionService implements IPluginInConnectionService, IConnectionServiceInternal, OvsdbRPC.Callback, OvsdbConnectionListener {
protected static final Logger logger = LoggerFactory.getLogger(ConnectionService.class);
// Properties that can be set in config.ini
private static final Integer defaultOvsdbPort = 6640;
private static final boolean defaultAutoConfigureController = true;
+ private OvsdbConnection connectionLib;
private static Integer ovsdbListenPort = defaultOvsdbPort;
private static boolean autoConfigureController = defaultAutoConfigureController;
private ConcurrentMap<String, Connection> ovsdbConnections;
}
}
+ public void setOvsdbConnection(OvsdbConnection connectionService) {
+ connectionLib = connectionService;
+ }
+
+ public void unsetOvsdbConnection(OvsdbConnection connectionService) {
+ connectionLib = null;
+ }
+
public void init() {
ovsdbConnections = new ConcurrentHashMap<String, Connection>();
int listenPort = defaultOvsdbPort;
* the services provided by the class are registered in the service registry
*/
void start() {
- startOvsdbManager();
}
/**
}
try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(new NioEventLoopGroup());
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.option(ChannelOption.TCP_NODELAY, true);
- bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
-
- bootstrap.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel channel) throws Exception {
- if (handlers == null) {
- channel.pipeline().addLast(
- //new LoggingHandler(LogLevel.INFO),
- new JsonRpcDecoder(100000),
- new StringEncoder(CharsetUtil.UTF_8));
- } else {
- for (ChannelHandler handler : handlers) {
- channel.pipeline().addLast(handler);
- }
- }
- }
- });
-
- ChannelFuture future = bootstrap.connect(address, port).sync();
- Channel channel = future.channel();
- return handleNewConnection(identifier, channel, this);
+ OvsdbClient client = connectionLib.connect(address, port);
+ return handleNewConnection(identifier, client, this);
} catch (InterruptedException e) {
logger.error("Thread was interrupted during connect", e);
} catch (ExecutionException e) {
public void notifyNodeDisconnectFromMaster(Node arg0) {
}
- private Node handleNewConnection(String identifier, Channel channel, ConnectionService instance) throws InterruptedException, ExecutionException {
- Connection connection = new Connection(identifier, channel);
+ private Node handleNewConnection(String identifier, OvsdbClient client, ConnectionService instance) throws InterruptedException, ExecutionException {
+ Connection connection = new Connection(identifier, client);
Node node = connection.getNode();
-
- ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- objectMapper.setSerializationInclusion(Include.NON_NULL);
-
- JsonRpcEndpoint factory = new JsonRpcEndpoint(objectMapper, channel);
- JsonRpcServiceBinderHandler binderHandler = new JsonRpcServiceBinderHandler(factory);
- binderHandler.setContext(node);
- channel.pipeline().addLast(binderHandler);
-
- OvsdbRPC ovsdb = factory.getClient(node, OvsdbRPC.class);
- connection.setRpc(ovsdb);
- ovsdb.registerCallback(instance);
ovsdbConnections.put(identifier, connection);
-
- ChannelConnectionHandler handler = new ChannelConnectionHandler();
- handler.setNode(node);
- handler.setConnectionService(this);
- ChannelFuture closeFuture = channel.closeFuture();
- closeFuture.addListener(handler);
// Keeping the Initial inventory update(s) on its own thread.
new Thread() {
Connection connection;
}
private void initializeInventoryForNewNode (Connection connection) throws InterruptedException, ExecutionException {
- Channel channel = connection.getChannel();
- InetAddress address = ((InetSocketAddress)channel.remoteAddress()).getAddress();
- int port = ((InetSocketAddress)channel.remoteAddress()).getPort();
+ OvsdbClient client = connection.getClient();
+ InetAddress address = client.getConnectionInfo().getRemoteAddress();
+ int port = client.getConnectionInfo().getRemotePort();
IPAddressProperty addressProp = new IPAddressProperty(address);
L4PortProperty l4Port = new L4PortProperty(port);
Set<Property> props = new HashSet<Property>();
inventoryServiceInternal.addNode(connection.getNode(), props);
List<String> dbNames = Arrays.asList(Open_vSwitch.NAME.getName());
- ListenableFuture<DatabaseSchema> dbSchemaF = null;//TODO : fix it up to new structue : connection.getRpc().get_schema(dbNames);
+ ListenableFuture<DatabaseSchema> dbSchemaF = client.getSchema("Open_vSwitch", true);
DatabaseSchema databaseSchema = dbSchemaF.get();
inventoryServiceInternal.updateDatabaseSchema(connection.getNode(), databaseSchema);
-
+/*
MonitorRequestBuilder monitorReq = null; //ashwin(not sure if we need) : new MonitorRequestBuilder();
for (Table<?> table : Tables.getTables()) {
- if (databaseSchema.getTables().keySet().contains(table.getTableName().getName())) {
+ if (databaseSchema.getTables().contains(table.getTableName().getName())) {
//ashwin(not sure if we need) monitorReq.monitor(table);
} else {
logger.debug("We know about table {} but it is not in the schema of {}", table.getTableName().getName(), connection.getNode().getNodeIDString());
logger.error("Error configuring monitor, error : {}, details : {}",
updates.getError(),
updates.getDetails());
- /* FIXME: This should be cause for alarm */
throw new RuntimeException("Failed to setup a monitor in OVSDB");
}
UpdateNotification monitor = new UpdateNotification();
if (autoConfigureController) {
this.updateOFControllers(connection.getNode());
}
+ */
inventoryServiceInternal.notifyNodeAdded(connection.getNode());
}
- private void startOvsdbManager() {
- new Thread() {
- @Override
- public void run() {
- ovsdbManager();
- }
- }.start();
- }
-
- private void ovsdbManager() {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 100)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel channel) throws Exception {
- logger.debug("New Passive channel created : "+ channel.toString());
- InetAddress address = channel.remoteAddress().getAddress();
- int port = channel.remoteAddress().getPort();
- String identifier = address.getHostAddress()+":"+port;
- channel.pipeline().addLast(
- new LoggingHandler(LogLevel.INFO),
- new JsonRpcDecoder(100000),
- new StringEncoder(CharsetUtil.UTF_8));
-
- Node node = handleNewConnection(identifier, channel, ConnectionService.this);
- logger.debug("Connected Node : "+node.toString());
- }
- });
- b.option(ChannelOption.TCP_NODELAY, true);
- b.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(65535, 65535, 65535));
- // Start the server.
- ChannelFuture f = b.bind(ovsdbListenPort).sync();
- serverListenChannel = f.channel();
- // Wait until the server socket is closed.
- serverListenChannel.closeFuture().sync();
- } catch (InterruptedException e) {
- logger.error("Thread interrupted", e);
- } finally {
- // Shut down all event loops to terminate all threads.
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
-
private IClusterGlobalServices clusterServices;
public void setClusterServices(IClusterGlobalServices i) {
}
try {
- controllerIP = ((InetSocketAddress)connection.getChannel().localAddress()).getAddress();
+ controllerIP = connection.getClient().getConnectionInfo().getLocalAddress();
controllers.add(controllerIP);
return controllers;
} catch (Exception e) {
public void stolen(Object context, List<String> ids) {
// TODO Auto-generated method stub
}
+
+ private String getConnectionIdentifier(OvsdbClient client) {
+ OvsdbConnectionInfo info = client.getConnectionInfo();
+ return info.getRemoteAddress().getHostAddress()+":"+info.getRemotePort();
+ }
+
+ @Override
+ public void connected(OvsdbClient client) {
+ logger.info("PLUGIN RECEIVED NOW CONNECTION FROM LIBRARY : "+ client.getConnectionInfo().toString());
+ String identifier = getConnectionIdentifier(client);
+ try {
+ Node node = handleNewConnection(identifier, client, ConnectionService.this);
+ } catch (InterruptedException | ExecutionException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void disconnected(OvsdbClient client) {
+ logger.info("PLUGIN RECEIVED CONNECTION DISCONNECT FROM LIBRARY : "+ client.getConnectionInfo().toString());
+ }
}
import org.opendaylight.controller.sal.inventory.IPluginOutInventoryService;
import org.opendaylight.controller.sal.utils.HexEncode;
import org.opendaylight.controller.sal.utils.ServiceHelper;
-import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
import org.opendaylight.ovsdb.lib.message.temp.TableUpdate;
import org.opendaylight.ovsdb.lib.message.temp.TableUpdate.Row;
import org.opendaylight.ovsdb.lib.message.temp.TableUpdates;
import org.opendaylight.ovsdb.lib.notation.OvsDBSet;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
import org.opendaylight.ovsdb.lib.table.Bridge;
import org.opendaylight.ovsdb.lib.table.Table;
import org.slf4j.Logger;
import org.opendaylight.controller.sal.core.Property;
import org.opendaylight.controller.sal.core.UpdateType;
import org.opendaylight.controller.sal.inventory.IPluginInInventoryService;
-import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
import org.opendaylight.ovsdb.lib.message.temp.TableUpdates;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
import org.opendaylight.ovsdb.lib.table.Table;
public interface InventoryServiceInternal extends IPluginInInventoryService {
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import com.google.common.collect.Maps;
-
import org.apache.commons.collections.MapUtils;
-import org.opendaylight.ovsdb.lib.database.DatabaseSchema;
+import org.opendaylight.ovsdb.lib.schema.DatabaseSchema;
import org.opendaylight.ovsdb.lib.table.Table;
+import com.google.common.collect.Maps;
+
public class NodeDB {
private DatabaseSchema schema;
ConcurrentMap<String, ConcurrentMap<String, Table<?>>> cache = Maps.newConcurrentMap();
}
public void printTableCache() {
- MapUtils.debugPrint(System.out, null, schema.getTables());
+ System.out.println(schema.getTables());
MapUtils.debugPrint(System.out, null, cache);
}
}