<artifactId>restconf-client-impl</artifactId>
</dependency>
+ <!-- clustering -->
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>remoterpc-routingtable.implementation</artifactId>
+ <version>${mdsal.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>sal-remoterpc-connector</artifactId>
+ <version>${mdsal.version}</version>
+ </dependency>
+
<!-- config-->
<dependency>
<groupId>org.opendaylight.controller</groupId>
--- /dev/null
+<snapshot>
+ <configuration>
+ <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">prefix:remote-zeromq-rpc-server</type>
+ <name>remoter</name>
+ <port xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">5666</port>
+ <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">prefix:dom-broker-osgi-registry</type>
+ <name>dom-broker</name>
+ </dom-broker>
+ </module>
+ </modules>
+ <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+ </services>
+ </data>
+ </configuration>
+
+ <required-capabilities>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom?module=opendaylight-md-sal-dom&revision=2013-10-28</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl?module=opendaylight-sal-dom-broker-impl&revision=2013-10-28</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:common?module=opendaylight-md-sal-common&revision=2013-10-28</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc?module=odl-sal-dom-rpc-remote-cfg&revision=2013-10-28</capability>
+ </required-capabilities>
+</snapshot>
+
<!-- Compability Packages -->
<module>compatibility</module>
- <!-- Clustering
- <module>remoterpc-routingtable/implementation</module>
+ <!-- Clustering -->
+ <module>remoterpc-routingtable/implementation</module>
<module>sal-remoterpc-connector/implementation</module>
- <module>clustered-data-store/implementation</module>
+ <!--module>clustered-data-store/implementation</module>
-->
</modules>
<parent>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-parent</artifactId>
- <version>1.0-SNAPSHOT</version>
+ <version>1.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
</parent>
<scm>
</scm>
<artifactId>remoterpc-routingtable.implementation</artifactId>
- <version>0.4.1-SNAPSHOT</version>
<packaging>bundle</packaging>
<build>
org.eclipse.osgi.framework.console,
org.osgi.framework,
javax.transaction,
- com.google.common.base
+ com.google.common.base,
+ com.google.common.collect
</Import-Package>
<Bundle-Activator>
org.opendaylight.controller.sal.connector.remoterpc.impl.Activator
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-connector-api</artifactId>
- <version>1.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>clustering.services</artifactId>
- <version>0.5.0-SNAPSHOT</version>
+ <version>${clustering.services.version}</version>
</dependency>
import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
"2013-07-09", "context-reference");
private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
-
+
private final String identifier;
private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
}
}
- private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<QName> {
+ private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<RpcRoutingContext> {
private final RoutedRpcStrategy strategy;
private final Set<QName> supportedRpcs;
+ private final RpcRoutingContext identifier;
private RpcImplementation defaultDelegate;
private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
- private SchemaAwareRpcBroker router;
+ private final SchemaAwareRpcBroker router;
public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
super();
this.strategy = strategy;
supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
+ identifier = RpcRoutingContext.create(strategy.context, strategy.getIdentifier());
this.router = router;
}
@Override
- public QName getIdentifier() {
- return strategy.getIdentifier();
+ public RpcRoutingContext getIdentifier() {
+ return identifier;
}
@Override
RoutedRpcRegistration {
private final QName type;
- private RoutedRpcSelector router;
+ private final RoutedRpcSelector router;
public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
super(implementation);
routeListener.getInstance().onRouteChange(change);
} catch (Exception e) {
LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
-
+
}
}
-
+
}
-
+
private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
}
}
}
-
+
@Override
public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
L listener) {
- return routeChangeListeners.registerWithType(listener);
+ ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
+ RouteChange<RpcRoutingContext, InstanceIdentifier> initial = createInitialRouteChange();
+ try {
+ listener.onRouteChange(initial);
+ } catch (Exception e) {
+ LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener);
+ }
+ return reg;
+ }
+
+ private RouteChange<RpcRoutingContext, InstanceIdentifier> createInitialRouteChange() {
+ FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
+
+
+ ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> announcements = ImmutableMap.builder();
+ ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> removals = ImmutableMap.builder();
+ for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
+ final RpcRoutingContext context = routedRpcSelector.getIdentifier();
+ final Set<InstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
+ announcements.put(context, paths);
+ }
+ return RoutingUtils.change(announcements.build(), removals.build());
}
}
<groupId>org.opendaylight.controller</groupId>
<artifactId>sal-parent</artifactId>
<relativePath>../..</relativePath>
- <version>1.0-SNAPSHOT</version>
+ <version>1.1-SNAPSHOT</version>
</parent>
<artifactId>sal-remoterpc-connector</artifactId>
<properties>
<zeromq.version>0.3.1</zeromq.version>
- <jackson.version>2.3.0</jackson.version>
<stax.version>1.0.1</stax.version>
+ <yang.jmx.plugin>0.2.4-SNAPSHOT</yang.jmx.plugin>
</properties>
<dependencies>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>remoterpc-routingtable.implementation</artifactId>
- <!-- TODO: fix the version. Why is it not MD Sal project version?-->
- <version>0.4.1-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<!-- AD Sal -->
<instructions>
<Import-Package>
*,
+ com.google.common.collect,
!org.codehaus.enunciate.jaxrs
</Import-Package>
<Export-Package>
<dependency>
<groupId>org.opendaylight.controller</groupId>
<artifactId>yang-jmx-generator-plugin</artifactId>
- <version>0.2.3-SNAPSHOT</version>
+ <version>${yang.jmx.plugin}</version>
</dependency>
<dependency>
<groupId>org.opendaylight.yangtools</groupId>
<artifactId>maven-sal-api-gen-plugin</artifactId>
- <version>${yangtools.version}</version>
+ <version>${yangtools.version}</version>
<type>jar</type>
</dependency>
</dependencies>
import org.osgi.framework.BundleContext;
/**
-*
-*/
-public final class ZeroMQServerModule extends org.opendaylight.controller.config.yang.md.sal.remote.rpc.AbstractZeroMQServerModule
- {
+ *
+ */
+public final class ZeroMQServerModule
+ extends org.opendaylight.controller.config.yang.md.sal.remote.rpc.AbstractZeroMQServerModule {
+
+ private static final Integer ZEROMQ_ROUTER_PORT = 5554;
+ private BundleContext bundleContext;
+
+ public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+ org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+ super(identifier, dependencyResolver);
+ }
- private static final Integer ZEROMQ_ROUTER_PORT = 5554;
- private BundleContext bundleContext;
+ public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+ org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
+ ZeroMQServerModule oldModule, java.lang.AutoCloseable oldInstance) {
- public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
- super(identifier, dependencyResolver);
- }
+ super(identifier, dependencyResolver, oldModule, oldInstance);
+ }
- public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
- ZeroMQServerModule oldModule, java.lang.AutoCloseable oldInstance) {
+ @Override
+ protected void customValidation() {
+ // Add custom validation for module attributes here.
+ }
- super(identifier, dependencyResolver, oldModule, oldInstance);
- }
+ @Override
+ public java.lang.AutoCloseable createInstance() {
- @Override
- protected void customValidation(){
- // Add custom validation for module attributes here.
- }
+ Broker broker = getDomBrokerDependency();
- @Override
- public java.lang.AutoCloseable createInstance() {
-
- Broker broker = getDomBrokerDependency();
+ final int port = getPort() != null ? getPort() : ZEROMQ_ROUTER_PORT;
- final int port = getPort() != null ? getPort() : ZEROMQ_ROUTER_PORT;
+ ServerImpl serverImpl = new ServerImpl(port);
- ServerImpl serverImpl = new ServerImpl(port);
-
- ClientImpl clientImpl = new ClientImpl();
+ ClientImpl clientImpl = new ClientImpl();
RoutingTableProvider provider = new RoutingTableProvider(bundleContext);//,serverImpl);
-
- facade.setRoutingTableProvider(provider );
+ RemoteRpcProvider facade = new RemoteRpcProvider(serverImpl, clientImpl);
+ facade.setRoutingTableProvider(provider);
facade.setContext(bundleContext);
facade.setRpcProvisionRegistry((RpcProvisionRegistry) broker);
import org.zeromq.ZMQ;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Message response = handler.handle(request);
CompositeNode payload = null;
- if ( response != null )
- payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
+ if ( response != null ) {
+ _logger.info("Received response [{}]", response);
+
+ Object rawPayload = response.getPayload();
+ switch (response.getType()) {
+ case ERROR:
+ if ( rawPayload instanceof List )
+ errors = (List) rawPayload;
+ break;
+
+ case RESPONSE:
+ payload = XmlUtils.xmlToCompositeNode((String) rawPayload);
+ break;
+
+ default:
+ errors.add(
+ RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null)
+ );
+ break;
+
+ }
+ }
return Rpcs.getRpcResult(true, payload, errors);
} catch (Exception e){
//otherwise first create the bridge and then send request
if ( connectedServers.containsKey(remoteServerAddress) )
return sendMessage(request, remoteServerAddress);
+
else{
workerPool.execute(new Worker(remoteServerAddress));
connectedServers.put(remoteServerAddress, remoteServerAddress);
ZMQ.Socket socket = context.socket(ZMQ.REQ);
try {
- socket.connect( INPROC_PROTOCOL_PREFIX + address);
+ String inProcessSocketAddress = INPROC_PROTOCOL_PREFIX + address;
+ socket.connect( inProcessSocketAddress );
+ _logger.debug("Sending request [{}]", request);
socket.send(Message.serialize(request));
- _logger.debug("Request sent. Waiting for reply...");
+ _logger.info("Request sent. Waiting for reply...");
byte[] reply = socket.recv(0);
- _logger.debug("Response received");
+ _logger.info("Response received");
response = (Message) Message.deserialize(reply);
+ _logger.debug("Response [{}]", response);
} finally {
socket.close();
}
*/
private class Worker implements Runnable {
private String name;
- private String remoteServer; //<servername:rpc-port>
+ private String remoteServer; //<serverip:rpc-port>
public Worker(String address){
this.name = DEFAULT_NAME + "[" + address + "]";
for (RpcRoutingContext context : changes.keySet()){
routeId = new RouteIdentifierImpl();
routeId.setType(context.getRpc());
- routeId.setContext(context.getContext());
+ //routeId.setContext(context.getContext());
for (InstanceIdentifier instanceId : changes.get(context)){
routeId.setRoute(instanceId);
package org.opendaylight.controller.sal.connector.remoterpc;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
import org.slf4j.Logger;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.util.Collection;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
while (!Thread.currentThread().isInterrupted()) {
- Message request = parseMessage(socket);
- _logger.debug("Received rpc request [{}]", request);
+ MessageHandler handler = new MessageHandler(socket);
+ handler.receiveMessage();
- if (request != null) {
- // Call broker to process the message then reply
+ if (handler.hasMessageForBroker()) {
+
+ Message request = handler.getMessage();
Future<RpcResult<CompositeNode>> rpc = null;
RpcResult<CompositeNode> result = null;
result = (rpc != null) ? rpc.get() : null;
- } catch (Exception e) {
- _logger.debug("Broker threw [{}]", e);
- }
-
- CompositeNode payload = (result != null) ? result.getResult() : null;
-
- Message response = new Message.MessageBuilder()
- .type(Message.MessageType.RESPONSE)
- .sender(serverAddress)
- .route(request.getRoute())
- .payload(XmlUtils.compositeNodeToXml(payload))
- .build();
+ handler.sendResponse(result);
- _logger.debug("Sending rpc response [{}]", response);
-
- try {
- socket.send(Message.serialize(response));
} catch (Exception e) {
- _logger.debug("rpc response send failed for message [{}]", response);
- _logger.debug("{}", e);
+ _logger.debug("Broker threw [{}]", e);
+ handler.sendError(e.getMessage());
}
}
+
}
} catch (Exception e) {
printException(e);
}
}
- /**
- * @param socket
- * @return
- */
- private Message parseMessage(ZMQ.Socket socket) throws Exception {
- byte[] bytes = socket.recv(); //this blocks
- _logger.debug("Received bytes:[{}]", bytes.length);
- return (Message) Message.deserialize(bytes);
- }
-
private void printException(Exception e) {
try (StringWriter s = new StringWriter();
PrintWriter p = new PrintWriter(s)) {
super.afterExecute(r, null);
}
}
+
+ class MessageHandler{
+ private ZMQ.Socket socket;
+ private Message message; //parsed message received on zmq server port
+ private boolean messageForBroker = false; //if the message is valid and not a "ping" message
+
+ public MessageHandler(ZMQ.Socket socket){
+ this.socket = socket;
+ }
+
+ void receiveMessage(){
+ byte[] bytes = socket.recv(); //this blocks
+ _logger.debug("Received bytes:[{}]", bytes.length);
+
+ Object objectRecvd = null;
+ try{
+ objectRecvd = Message.deserialize(bytes);
+ }catch (Exception e){
+ sendError(e.getMessage());
+ return;
+ }
+
+ if (!(objectRecvd instanceof Message)) {
+ sendError("Invalid message received");
+ return;
+ }
+
+ message = (Message) objectRecvd;
+
+ _logger.info("Received request [{}]", message);
+
+ if (Message.MessageType.PING == message.getType()){
+ sendPong();
+ return;
+ }
+
+ messageForBroker = true;
+ }
+
+ boolean hasMessageForBroker(){
+ return messageForBroker;
+ }
+
+ Message getMessage(){
+ return message;
+ }
+
+ void sendResponse(RpcResult<CompositeNode> result){
+ CompositeNode payload = (result != null) ? result.getResult() : null;
+
+ String recipient = null;
+ RpcRouter.RouteIdentifier routeId = null;
+
+ if (message != null) {
+ recipient = message.getSender();
+ routeId = message.getRoute();
+ }
+
+ Message response = new Message.MessageBuilder()
+ .type(Message.MessageType.RESPONSE)
+ .sender(serverAddress)
+ .recipient(recipient)
+ .route(routeId)
+ .payload(XmlUtils.compositeNodeToXml(payload))
+ .build();
+
+ send(response);
+ }
+
+ private void sendError(String msg){
+ Message errorResponse = new Message.MessageBuilder()
+ .type(Message.MessageType.ERROR)
+ .sender(serverAddress)
+ .payload(msg)
+ .build();
+
+ send(errorResponse);
+ }
+
+ private void sendPong(){
+ Message pong = new Message.MessageBuilder()
+ .type(Message.MessageType.PONG)
+ .sender(serverAddress)
+ .build();
+
+ send(pong);
+ }
+
+ private void send(Message msg){
+ byte[] serializedMessage = null;
+ try {
+ serializedMessage = Message.serialize(msg);
+ } catch (Exception e) {
+ _logger.debug("Unexpected error during serialization of response [{}]", msg);
+ return;
+ }
+
+ if (serializedMessage != null)
+ if (socket.send(serializedMessage))
+ _logger.info("Response sent [{}]", msg);
+ else _logger.debug("Failed to send serialized message");
+ }
+ }
}
public class Message implements Serializable {
public static enum MessageType {
- ANNOUNCE((byte) 0), //TODO: Remove announce, add rpc registration and deregistration
- HEARTBEAT((byte) 1),
+ PING((byte) 0),
+ PONG((byte) 1),
REQUEST((byte) 2),
RESPONSE((byte) 3),
ERROR((byte)4);
public void setRecipient(String recipient) {
this.recipient = recipient;
}
+
@Override
public String toString() {
return "Message{" +
*/
package org.opendaylight.controller.sal.connector.remoterpc.dto;
-import java.io.Serializable;
-import java.net.URI;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import java.io.Serializable;
+
public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>,Serializable {
private QName context;
public void setRoute(InstanceIdentifier route) {
this.route = route;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ RouteIdentifierImpl that = (RouteIdentifierImpl) o;
+
+ if (context == null){
+ if (that.getContext() != null) return false;
+ }else
+ if (!context.equals(that.context)) return false;
+
+ if (route == null){
+ if (that.getRoute() != null) return false;
+ }else
+ if (!route.equals(that.route)) return false;
+
+ if (type == null){
+ if (that.getType() != null) return false;
+ }else
+ if (!type.equals(that.type)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int prime = 31;
+ int result = 0;
+ result = prime * result + (context == null ? 0:context.hashCode());
+ result = prime * result + (type == null ? 0:type.hashCode());
+ result = prime * result + (route == null ? 0:route.hashCode());
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "RouteIdentifierImpl{" +
+ "context=" + context +
+ ", type=" + type +
+ ", route=" + route +
+ '}';
+ }
}
@Test
public void toXml() throws FileNotFoundException {
- InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/FourSimpleChildren.xml");
+ //InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/FourSimpleChildren.xml");
+ InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/AddFlow.xml");
StringWriter writer = new StringWriter();
CompositeNode data = loadCompositeNode(xmlStream);
_logger.info("Parsed xml [{}]", writer.toString());
}
- //Note to self: Stolen from TestUtils
- ///Users/alefan/odl/controller4/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/TestUtils.java
// Figure out how to include TestUtils through pom ...was getting errors
private CompositeNode loadCompositeNode(InputStream xmlInputStream) throws FileNotFoundException {
if (xmlInputStream == null) {
};
}
+ public static Runnable sendAMessage(final ZMQ.Context context, final String serverAddress, final Message msg)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ return new Runnable() {
+ @Override
+ public void run() {
+ final ZMQ.Socket socket = context.socket(ZMQ.REQ);
+ try {
+
+ socket.connect(serverAddress);
+ System.out.println(Thread.currentThread().getName() + " Sending message");
+ try {
+ socket.send(Message.serialize(msg));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ byte[] bytes = socket.recv();
+ Message response = null;
+ try {
+ response = (Message) Message.deserialize(bytes);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ System.out.println(Thread.currentThread().getName() + " Got response " + response);
+ } catch (Exception x) {
+ x.printStackTrace();
+ } finally {
+ socket.close();
+ }
+ }
+ };
+ }
+
public static Runnable sendAnEmptyMessage(final ZMQ.Context context, final String serverAddress)
throws IOException, ClassNotFoundException, InterruptedException {
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc.utils;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.zeromq.ZMQ;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class RemoteServerTestClient {
+
+
+
+ public static void main(String args[]) throws Exception{
+ String serverAddress = "tcp://10.195.128.108:5666";
+ ZMQ.Context ctx = ZMQ.context(1);
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ RemoteServerTestClient client = new RemoteServerTestClient();
+ executor.execute(
+ MessagingUtil.sendAMessage(ctx, serverAddress, client.createPingMessage(serverAddress))
+ );
+ MessagingUtil.sendAMessage(ctx, serverAddress, client.createPingMessage(serverAddress));
+
+ Thread.sleep(5000);
+ MessagingUtil.closeZmqContext(ctx);
+ executor.shutdown();
+ }
+
+ public Message createPingMessage(String serverAddress){
+ Message ping = new Message.MessageBuilder()
+ .type(Message.MessageType.PING)
+ .sender("localhost:5444")
+ .recipient(serverAddress)
+ .build();
+
+ return ping;
+ }
+ public Message createAddFlowMessage(String serverAddress ){
+
+ RpcRouter.RouteIdentifier routeIdentifier = getAddFlowRpcIdentifier();
+
+ Message addFlow = new Message.MessageBuilder()
+ .type(Message.MessageType.REQUEST)
+ .sender("localhost:5444")
+ .recipient(serverAddress)
+ .route(routeIdentifier)
+ .payload(getAddFlowPayload(1,1))
+ .build();
+
+ return addFlow;
+ }
+
+ private RpcRouter.RouteIdentifier getAddFlowRpcIdentifier(){
+ throw new UnsupportedOperationException();
+ }
+
+ private CompositeNode getAddFlowPayload(int flowId, int tableId){
+ final String xml =
+ "<flow xmlns=\"urn:opendaylight:flow:inventory\">"
+ + "<priority>5</priority>"
+ + "<flow-name>Foo</flow-name>"
+ + "<match>"
+ + "<ethernet-match>"
+ + "<ethernet-type>"
+ + "<type>2048</type>"
+ + "</ethernet-type>"
+ + "</ethernet-match>"
+ + "<ipv4-destination>10.0.10.2/24</ipv4-destination>"
+ + "</match>"
+ + "<id>" + flowId + "</id>"
+ + "<table_id>" + tableId + "</table_id>"
+ + "<instructions>"
+ + "<instruction>"
+ + "<order>0</order>"
+ + "<apply-actions>"
+ + "<action>"
+ + "<order>0</order>"
+ + "<dec-nw-ttl/>"
+ + "</action>"
+ + "</apply-actions>"
+ + "</instruction>"
+ + "</instructions>"
+ + "</flow>";
+
+ return XmlUtils.xmlToCompositeNode(xml);
+ }
+}
--- /dev/null
+<add-flow xmlns="urn:opendaylight:flow:service">
+ <input>
+ <transaction-uri>BA-7</transaction-uri>
+ <table_id>4</table_id>
+ <priority>5</priority>
+ <node>
+ /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]
+ </node>
+ <match>
+ <ipv4-destination>10.0.10.2/24</ipv4-destination>
+ <ethernet-match>
+ <ethernet-type>
+ <type>2048</type>
+ </ethernet-type>
+ </ethernet-match>
+ </match>
+ <instructions>
+ <instruction>
+ <order>0</order>
+ <apply-actions>
+ <action>
+ <order>0</order>
+ <dec-nw-ttl/>
+ </action>
+ </apply-actions>
+ </instruction>
+ </instructions>
+ <flow-table>
+ /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)table[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]
+ </flow-table>
+ <flow-ref>
+ /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)table[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)flow[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]
+ </flow-ref>
+ <flow-name>Foo</flow-name>
+ </input>
+</add-flow>
\ No newline at end of file
for (String container : containerList) {
Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
.get(container);
- Edge edge = edgePropsMap.get(connector).getLeft();
- if (edge.getTailNodeConnector().equals(connector)) {
- ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
- .get(container);
- if (update.type == UpdateType.ADDED) {
- topologServiceShimListener
- .edgeOverUtilized(edge);
- } else {
- topologServiceShimListener
- .edgeUtilBackToNormal(edge);
+ // the edgePropsMap for a particular container may not have
+ // the connector.
+ // so check for null
+ Pair<Edge, Set<Property>> edgeProp = edgePropsMap.get(connector);
+ if(edgeProp != null) {
+ Edge edge = edgeProp.getLeft();
+ if (edge.getTailNodeConnector().equals(connector)) {
+ ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
+ .get(container);
+ if (update.type == UpdateType.ADDED) {
+ topologServiceShimListener
+ .edgeOverUtilized(edge);
+ } else {
+ topologServiceShimListener
+ .edgeUtilBackToNormal(edge);
+ }
}
}
}