Merge "Get rid of netconf operation filters"
authorTony Tkacik <ttkacik@cisco.com>
Wed, 5 Mar 2014 09:25:17 +0000 (09:25 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Wed, 5 Mar 2014 09:25:17 +0000 (09:25 +0000)
19 files changed:
opendaylight/distribution/opendaylight/pom.xml
opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/02-clustering.xml [new file with mode: 0644]
opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/03-toaster-sample.xml [moved from opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/02-toaster-sample.xml with 100% similarity]
opendaylight/md-sal/pom.xml
opendaylight/md-sal/remoterpc-routingtable/implementation/pom.xml
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandler.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/Message.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RouteIdentifierImpl.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SerilizationTest.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/MessagingUtil.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/RemoteServerTestClient.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/AddFlow.xml [new file with mode: 0644]
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/TopologyServiceShim.java

index 79b9ccea832a26a5b295ef469d768f77717a0583..38dc454ac3c98b9614bda2dc30c6f6e0e8ceec4a 100644 (file)
           <artifactId>restconf-client-impl</artifactId>
         </dependency>
 
+        <!-- clustering -->
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>remoterpc-routingtable.implementation</artifactId>
+          <version>${mdsal.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.opendaylight.controller</groupId>
+          <artifactId>sal-remoterpc-connector</artifactId>
+          <version>${mdsal.version}</version>
+        </dependency>
+
         <!-- config-->
         <dependency>
           <groupId>org.opendaylight.controller</groupId>
diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/02-clustering.xml b/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/02-clustering.xml
new file mode 100644 (file)
index 0000000..7853b86
--- /dev/null
@@ -0,0 +1,27 @@
+<snapshot>
+    <configuration>
+        <data xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+            <modules xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+                <module>
+                   <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">prefix:remote-zeromq-rpc-server</type>
+                   <name>remoter</name>
+                   <port xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">5666</port>
+                   <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">
+                       <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">prefix:dom-broker-osgi-registry</type>
+                       <name>dom-broker</name>
+                   </dom-broker>
+               </module>
+            </modules>
+            <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
+            </services>
+        </data>
+    </configuration>
+
+    <required-capabilities>
+       <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom?module=opendaylight-md-sal-dom&amp;revision=2013-10-28</capability>
+        <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:impl?module=opendaylight-sal-dom-broker-impl&amp;revision=2013-10-28</capability>
+        <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:common?module=opendaylight-md-sal-common&amp;revision=2013-10-28</capability>
+        <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc?module=odl-sal-dom-rpc-remote-cfg&amp;revision=2013-10-28</capability>
+    </required-capabilities>
+</snapshot>
+
index 6cc06ba70502b2e4fed1d315120326cd6552315d..d05e8b0e61ed90876a430c6163fd66ea634111fd 100644 (file)
         <!-- Compability Packages -->
         <module>compatibility</module>
 
-       <!-- Clustering
-       <module>remoterpc-routingtable/implementation</module>
+       <!-- Clustering -->
+             <module>remoterpc-routingtable/implementation</module>
         <module>sal-remoterpc-connector/implementation</module>
-        <module>clustered-data-store/implementation</module>
+        <!--module>clustered-data-store/implementation</module>
         -->
         
     </modules>
index dedd318a0cf4b1e2cce4396d5b1874cc63548a74..ccff37c41d8543b6664d54a1a497070991c1c870 100644 (file)
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>sal-parent</artifactId>
-        <version>1.0-SNAPSHOT</version>
+        <version>1.1-SNAPSHOT</version>
         <relativePath>../..</relativePath>
     </parent>
     <scm>
@@ -16,7 +16,6 @@
     </scm>
 
     <artifactId>remoterpc-routingtable.implementation</artifactId>
-    <version>0.4.1-SNAPSHOT</version>
     <packaging>bundle</packaging>
 
     <build>
@@ -49,7 +48,8 @@
                             org.eclipse.osgi.framework.console,
                             org.osgi.framework,
                             javax.transaction,
-                            com.google.common.base
+                            com.google.common.base,
+                            com.google.common.collect
                         </Import-Package>
                         <Bundle-Activator>
                             org.opendaylight.controller.sal.connector.remoterpc.impl.Activator
         <dependency>
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>sal-connector-api</artifactId>
-            <version>1.0-SNAPSHOT</version>
+            <version>${project.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.opendaylight.controller</groupId>
             <artifactId>clustering.services</artifactId>
-            <version>0.5.0-SNAPSHOT</version>
+            <version>${clustering.services.version}</version>
         </dependency>
 
 
index 22319abb17df7d1dec301105a59c50fad2cb1164..598361c3ae3cbf7e41eed5a69ea9cf0d74727702 100644 (file)
@@ -43,6 +43,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
@@ -53,7 +55,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
             "2013-07-09", "context-reference");
     private final ListenerRegistry<RpcRegistrationListener> rpcRegistrationListeners = new ListenerRegistry<>();
     private final ListenerRegistry<RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> routeChangeListeners = new ListenerRegistry<>();
-    
+
 
     private final String identifier;
     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
@@ -280,24 +282,26 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
         }
     }
 
-    private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<QName> {
+    private static class RoutedRpcSelector implements RpcImplementation, AutoCloseable, Identifiable<RpcRoutingContext> {
 
         private final RoutedRpcStrategy strategy;
         private final Set<QName> supportedRpcs;
+        private final RpcRoutingContext identifier;
         private RpcImplementation defaultDelegate;
         private final ConcurrentMap<InstanceIdentifier, RoutedRpcRegImpl> implementations = new ConcurrentHashMap<>();
-        private SchemaAwareRpcBroker router;
+        private final SchemaAwareRpcBroker router;
 
         public RoutedRpcSelector(RoutedRpcStrategy strategy, SchemaAwareRpcBroker router) {
             super();
             this.strategy = strategy;
             supportedRpcs = ImmutableSet.of(strategy.getIdentifier());
+            identifier = RpcRoutingContext.create(strategy.context, strategy.getIdentifier());
             this.router = router;
         }
 
         @Override
-        public QName getIdentifier() {
-            return strategy.getIdentifier();
+        public RpcRoutingContext getIdentifier() {
+            return identifier;
         }
 
         @Override
@@ -382,7 +386,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
             RoutedRpcRegistration {
 
         private final QName type;
-        private RoutedRpcSelector router;
+        private final RoutedRpcSelector router;
 
         public RoutedRpcRegImpl(QName rpcType, RpcImplementation implementation, RoutedRpcSelector routedRpcSelector) {
             super(implementation);
@@ -424,13 +428,13 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
                 routeListener.getInstance().onRouteChange(change);
             } catch (Exception e) {
                 LOG.error("Unhandled exception during invoking onRouteChange for {}",routeListener.getInstance(),e);
-                
+
             }
         }
-        
+
     }
 
-    
+
 
     private void notifyPathWithdrawal(QName context,QName identifier, InstanceIdentifier path) {
         RpcRoutingContext contextWrapped = RpcRoutingContext.create(context, identifier);
@@ -443,10 +447,31 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, Ro
             }
         }
     }
-    
+
     @Override
     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(
             L listener) {
-        return routeChangeListeners.registerWithType(listener);
+        ListenerRegistration<L> reg = routeChangeListeners.registerWithType(listener);
+        RouteChange<RpcRoutingContext, InstanceIdentifier> initial = createInitialRouteChange();
+        try {
+        listener.onRouteChange(initial);
+        } catch (Exception e) {
+            LOG.error("Unhandled exception during sending initial route change event {} to {}",initial,listener);
+        }
+        return reg;
+    }
+
+    private RouteChange<RpcRoutingContext, InstanceIdentifier> createInitialRouteChange() {
+        FluentIterable<RoutedRpcSelector> rpcSelectors = FluentIterable.from(implementations.values()).filter(RoutedRpcSelector.class);
+
+
+        ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> announcements = ImmutableMap.builder();
+        ImmutableMap.Builder<RpcRoutingContext, Set<InstanceIdentifier>> removals = ImmutableMap.builder();
+        for (RoutedRpcSelector routedRpcSelector : rpcSelectors) {
+            final RpcRoutingContext context = routedRpcSelector.getIdentifier();
+            final Set<InstanceIdentifier> paths = ImmutableSet.copyOf(routedRpcSelector.implementations.keySet());
+            announcements.put(context, paths);
+        }
+        return RoutingUtils.change(announcements.build(), removals.build());
     }
 }
index ae31c2382fa82b3a9901bdbc4cfdb9be07c3f815..9a8f14980dca58035e3961db7c331f55d1ffd0fd 100644 (file)
@@ -6,7 +6,7 @@
         <groupId>org.opendaylight.controller</groupId>
         <artifactId>sal-parent</artifactId>
         <relativePath>../..</relativePath>
-        <version>1.0-SNAPSHOT</version>
+        <version>1.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>sal-remoterpc-connector</artifactId>
@@ -14,8 +14,8 @@
 
   <properties>
     <zeromq.version>0.3.1</zeromq.version>
-    <jackson.version>2.3.0</jackson.version>
     <stax.version>1.0.1</stax.version>
+    <yang.jmx.plugin>0.2.4-SNAPSHOT</yang.jmx.plugin>
   </properties>
 
   <dependencies>
@@ -38,8 +38,7 @@
     <dependency>
       <groupId>org.opendaylight.controller</groupId>
       <artifactId>remoterpc-routingtable.implementation</artifactId>
-      <!-- TODO: fix the version. Why is it not MD Sal project version?-->
-      <version>0.4.1-SNAPSHOT</version>
+      <version>${project.version}</version>
     </dependency>
 
     <!-- AD Sal -->
                     <instructions>
                         <Import-Package>
                             *,
+                            com.google.common.collect,
                             !org.codehaus.enunciate.jaxrs
                         </Import-Package>
                         <Export-Package>
                     <dependency>
                         <groupId>org.opendaylight.controller</groupId>
                         <artifactId>yang-jmx-generator-plugin</artifactId>
-                        <version>0.2.3-SNAPSHOT</version>
+                        <version>${yang.jmx.plugin}</version>
                     </dependency>
                     <dependency>
                         <groupId>org.opendaylight.yangtools</groupId>
                         <artifactId>maven-sal-api-gen-plugin</artifactId>
-                       <version>${yangtools.version}</version>
+                                   <version>${yangtools.version}</version>
                         <type>jar</type>
                     </dependency>
                 </dependencies>
index d874381ab374eb2d32a10e09bf959dc810080c0d..f55566acc37ca9c4eac03ab87642c77db28a1f69 100644 (file)
@@ -13,44 +13,46 @@ import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.osgi.framework.BundleContext;
 
 /**
-*
-*/
-public final class ZeroMQServerModule extends org.opendaylight.controller.config.yang.md.sal.remote.rpc.AbstractZeroMQServerModule
- {
+ *
+ */
+public final class ZeroMQServerModule
+    extends org.opendaylight.controller.config.yang.md.sal.remote.rpc.AbstractZeroMQServerModule {
+
+  private static final Integer ZEROMQ_ROUTER_PORT = 5554;
+  private BundleContext bundleContext;
+
+  public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+                            org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
+    super(identifier, dependencyResolver);
+  }
 
-    private static final Integer ZEROMQ_ROUTER_PORT = 5554;
-    private BundleContext bundleContext;
+  public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier,
+                            org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
+                            ZeroMQServerModule oldModule, java.lang.AutoCloseable oldInstance) {
 
-    public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver) {
-        super(identifier, dependencyResolver);
-    }
+    super(identifier, dependencyResolver, oldModule, oldInstance);
+  }
 
-    public ZeroMQServerModule(org.opendaylight.controller.config.api.ModuleIdentifier identifier, org.opendaylight.controller.config.api.DependencyResolver dependencyResolver,
-            ZeroMQServerModule oldModule, java.lang.AutoCloseable oldInstance) {
+  @Override
+  protected void customValidation() {
+    // Add custom validation for module attributes here.
+  }
 
-        super(identifier, dependencyResolver, oldModule, oldInstance);
-    }
+  @Override
+  public java.lang.AutoCloseable createInstance() {
 
-    @Override
-    protected void customValidation(){
-        // Add custom validation for module attributes here.
-    }
+    Broker broker = getDomBrokerDependency();
 
-    @Override
-    public java.lang.AutoCloseable createInstance() {
-        
-        Broker broker = getDomBrokerDependency();
+    final int port = getPort() != null ? getPort() : ZEROMQ_ROUTER_PORT;
 
-        final int port = getPort() != null ? getPort() : ZEROMQ_ROUTER_PORT;
+    ServerImpl serverImpl = new ServerImpl(port);
 
-        ServerImpl serverImpl = new ServerImpl(port);
-        
-        ClientImpl clientImpl = new ClientImpl();
+    ClientImpl clientImpl = new ClientImpl();
 
     RoutingTableProvider provider = new RoutingTableProvider(bundleContext);//,serverImpl);
 
-
-    facade.setRoutingTableProvider(provider );
+    RemoteRpcProvider facade = new RemoteRpcProvider(serverImpl, clientImpl);
+    facade.setRoutingTableProvider(provider);
     facade.setContext(bundleContext);
     facade.setRpcProvisionRegistry((RpcProvisionRegistry) broker);
 
index 30e11c0806731c24ab151dd7cf585408916fd233..84df2e43f01cd54084357d58b67abbe0ea6e93e9 100644 (file)
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 import org.zeromq.ZMQ;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -140,9 +141,29 @@ public class ClientImpl implements RemoteRpcClient {
       Message response = handler.handle(request);
       CompositeNode payload = null;
 
-      if ( response != null )
-        payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
+      if ( response != null ) {
 
+        _logger.info("Received response [{}]", response);
+
+        Object rawPayload = response.getPayload();
+        switch (response.getType()) {
+          case ERROR:
+            if ( rawPayload instanceof List )
+              errors = (List) rawPayload;
+              break;
+
+          case RESPONSE:
+            payload = XmlUtils.xmlToCompositeNode((String) rawPayload);
+            break;
+
+          default:
+            errors.add(
+                RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null)
+            );
+            break;
+
+        }
+      }
       return Rpcs.getRpcResult(true, payload, errors);
 
     } catch (Exception e){
index f3ef4b6cae3be2b1dd9221577c2f8e81bc0a5f7b..fe70fb77be02a04a9db392bd60fe29e96211cf6d 100644 (file)
@@ -90,6 +90,7 @@ class ClientRequestHandler implements AutoCloseable{
     //otherwise first create the bridge and then send request
     if ( connectedServers.containsKey(remoteServerAddress) )
       return sendMessage(request, remoteServerAddress);
+
     else{
       workerPool.execute(new Worker(remoteServerAddress));
       connectedServers.put(remoteServerAddress, remoteServerAddress);
@@ -105,12 +106,15 @@ class ClientRequestHandler implements AutoCloseable{
     ZMQ.Socket socket = context.socket(ZMQ.REQ);
 
     try {
-      socket.connect( INPROC_PROTOCOL_PREFIX + address);
+      String inProcessSocketAddress = INPROC_PROTOCOL_PREFIX + address;
+      socket.connect( inProcessSocketAddress );
+      _logger.debug("Sending request [{}]", request);
       socket.send(Message.serialize(request));
-      _logger.debug("Request sent. Waiting for reply...");
+      _logger.info("Request sent. Waiting for reply...");
       byte[] reply = socket.recv(0);
-      _logger.debug("Response received");
+      _logger.info("Response received");
       response = (Message) Message.deserialize(reply);
+      _logger.debug("Response [{}]", response);
     } finally {
       socket.close();
     }
@@ -143,7 +147,7 @@ class ClientRequestHandler implements AutoCloseable{
    */
   private class Worker implements Runnable {
     private String name;
-    private String remoteServer;  //<servername:rpc-port>
+    private String remoteServer;  //<serverip:rpc-port>
 
     public Worker(String address){
       this.name = DEFAULT_NAME + "[" + address + "]";
index 639e31ddc3ec0e8380b5108efbe8f4afa92ec513..16e720024752c157b0bd68b4df5f790ffb639fca 100644 (file)
@@ -279,7 +279,7 @@ public class RemoteRpcProvider implements
       for (RpcRoutingContext context : changes.keySet()){
         routeId = new RouteIdentifierImpl();
         routeId.setType(context.getRpc());
-        routeId.setContext(context.getContext());
+        //routeId.setContext(context.getContext());
 
         for (InstanceIdentifier instanceId : changes.get(context)){
           routeId.setRoute(instanceId);
index 949e6ee9a8fb2653da07a848b414c81160aea2ab..2041f03afba07c58146c1c2d449d3e1f5d2ccc3d 100644 (file)
@@ -8,10 +8,13 @@
 
 package org.opendaylight.controller.sal.connector.remoterpc;
 
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 import org.slf4j.Logger;
@@ -21,6 +24,7 @@ import org.zeromq.ZMQ;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.Collection;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -101,11 +105,12 @@ public class ServerRequestHandler implements AutoCloseable{
 
         while (!Thread.currentThread().isInterrupted()) {
 
-          Message request = parseMessage(socket);
-          _logger.debug("Received rpc request [{}]", request);
+          MessageHandler handler = new MessageHandler(socket);
+          handler.receiveMessage();
 
-          if (request != null) {
-            // Call broker to process the message then reply
+          if (handler.hasMessageForBroker()) {
+
+            Message request = handler.getMessage();
             Future<RpcResult<CompositeNode>> rpc = null;
             RpcResult<CompositeNode> result = null;
 
@@ -117,28 +122,14 @@ public class ServerRequestHandler implements AutoCloseable{
 
               result = (rpc != null) ? rpc.get() : null;
 
-            } catch (Exception e) {
-              _logger.debug("Broker threw  [{}]", e);
-            }
-
-            CompositeNode payload = (result != null) ? result.getResult() : null;
-
-            Message response = new Message.MessageBuilder()
-                .type(Message.MessageType.RESPONSE)
-                .sender(serverAddress)
-                .route(request.getRoute())
-                .payload(XmlUtils.compositeNodeToXml(payload))
-                .build();
+              handler.sendResponse(result);
 
-            _logger.debug("Sending rpc response [{}]", response);
-
-            try {
-              socket.send(Message.serialize(response));
             } catch (Exception e) {
-              _logger.debug("rpc response send failed for message [{}]", response);
-              _logger.debug("{}", e);
+              _logger.debug("Broker threw  [{}]", e);
+              handler.sendError(e.getMessage());
             }
           }
+
         }
       } catch (Exception e) {
         printException(e);
@@ -147,16 +138,6 @@ public class ServerRequestHandler implements AutoCloseable{
       }
     }
 
-    /**
-     * @param socket
-     * @return
-     */
-    private Message parseMessage(ZMQ.Socket socket) throws Exception {
-      byte[] bytes = socket.recv(); //this blocks
-      _logger.debug("Received bytes:[{}]", bytes.length);
-      return (Message) Message.deserialize(bytes);
-    }
-
     private void printException(Exception e) {
       try (StringWriter s = new StringWriter();
            PrintWriter p = new PrintWriter(s)) {
@@ -204,4 +185,107 @@ public class ServerRequestHandler implements AutoCloseable{
       super.afterExecute(r, null);
     }
   }
+
+  class MessageHandler{
+    private ZMQ.Socket socket;
+    private Message message;          //parsed message received on zmq server port
+    private boolean messageForBroker = false; //if the message is valid and not a "ping" message
+
+    public MessageHandler(ZMQ.Socket socket){
+      this.socket = socket;
+    }
+
+    void receiveMessage(){
+      byte[] bytes = socket.recv(); //this blocks
+      _logger.debug("Received bytes:[{}]", bytes.length);
+
+      Object objectRecvd = null;
+      try{
+        objectRecvd = Message.deserialize(bytes);
+      }catch (Exception e){
+        sendError(e.getMessage());
+        return;
+      }
+
+      if (!(objectRecvd instanceof Message)) {
+        sendError("Invalid message received");
+        return;
+      }
+
+      message = (Message) objectRecvd;
+
+      _logger.info("Received request [{}]", message);
+
+      if (Message.MessageType.PING == message.getType()){
+        sendPong();
+        return;
+      }
+
+      messageForBroker = true;
+    }
+
+    boolean hasMessageForBroker(){
+      return messageForBroker;
+    }
+
+    Message getMessage(){
+      return message;
+    }
+
+    void sendResponse(RpcResult<CompositeNode> result){
+      CompositeNode payload = (result != null) ? result.getResult() : null;
+
+      String recipient = null;
+      RpcRouter.RouteIdentifier routeId = null;
+
+      if (message != null) {
+        recipient = message.getSender();
+        routeId   = message.getRoute();
+      }
+
+      Message response = new Message.MessageBuilder()
+          .type(Message.MessageType.RESPONSE)
+          .sender(serverAddress)
+          .recipient(recipient)
+          .route(routeId)
+          .payload(XmlUtils.compositeNodeToXml(payload))
+          .build();
+
+      send(response);
+    }
+
+    private void sendError(String msg){
+      Message errorResponse = new Message.MessageBuilder()
+          .type(Message.MessageType.ERROR)
+          .sender(serverAddress)
+          .payload(msg)
+          .build();
+
+      send(errorResponse);
+    }
+
+    private void sendPong(){
+      Message pong = new Message.MessageBuilder()
+          .type(Message.MessageType.PONG)
+          .sender(serverAddress)
+          .build();
+
+      send(pong);
+    }
+
+    private void send(Message msg){
+      byte[] serializedMessage = null;
+      try {
+        serializedMessage = Message.serialize(msg);
+      } catch (Exception e) {
+        _logger.debug("Unexpected error during serialization of response [{}]", msg);
+        return;
+      }
+
+      if (serializedMessage != null)
+        if (socket.send(serializedMessage))
+          _logger.info("Response sent [{}]", msg);
+        else  _logger.debug("Failed to send serialized message");
+    }
+  }
 }
index 95fe99c81c7da67e52b8274ffd37eddea8753aa2..519791a1956addb07576487edacdcb2247184ab0 100644 (file)
@@ -15,8 +15,8 @@ import java.io.*;
 public class Message implements Serializable {
 
  public static enum MessageType {
-    ANNOUNCE((byte) 0),  //TODO: Remove announce, add rpc registration and deregistration
-    HEARTBEAT((byte) 1),
+    PING((byte) 0),
+    PONG((byte) 1),
     REQUEST((byte) 2),
     RESPONSE((byte) 3),
     ERROR((byte)4);
@@ -77,6 +77,7 @@ public class Message implements Serializable {
   public void setRecipient(String recipient) {
     this.recipient = recipient;
   }
+
   @Override
   public String toString() {
     return "Message{" +
index ec6a1a94b6ecfe2a4d58504f332145ccd5c53184..4ffcf3e099c7f49c3b811d86f24a5961d689850d 100644 (file)
@@ -7,15 +7,12 @@
  */
 package org.opendaylight.controller.sal.connector.remoterpc.dto;
 
-import java.io.Serializable;
-import java.net.URI;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 
+import java.io.Serializable;
+
 public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>,Serializable {
 
   private QName context;
@@ -48,4 +45,48 @@ public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QNa
   public void setRoute(InstanceIdentifier route) {
     this.route = route;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    RouteIdentifierImpl that = (RouteIdentifierImpl) o;
+
+    if (context == null){
+      if (that.getContext() != null)  return false;
+    }else
+      if (!context.equals(that.context)) return false;
+
+    if (route == null){
+      if (that.getRoute() != null) return false;
+    }else
+      if (!route.equals(that.route)) return false;
+
+    if (type == null){
+      if (that.getType() != null) return false;
+    }else
+      if (!type.equals(that.type)) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int prime = 31;
+    int result = 0;
+    result = prime * result + (context == null ? 0:context.hashCode());
+    result = prime * result + (type    == null ? 0:type.hashCode());
+    result = prime * result + (route   == null ? 0:route.hashCode());
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "RouteIdentifierImpl{" +
+        "context=" + context +
+        ", type=" + type +
+        ", route=" + route +
+        '}';
+  }
 }
index 454dc374a9f71b810cc5e1ce067060873ead881c..41422fd680b00a33660f880f12462eb96197e51a 100644 (file)
@@ -38,7 +38,8 @@ public class SerilizationTest {
   @Test
   public void toXml() throws FileNotFoundException {
 
-    InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/FourSimpleChildren.xml");
+    //InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/FourSimpleChildren.xml");
+    InputStream xmlStream = SerilizationTest.class.getResourceAsStream("/AddFlow.xml");
     StringWriter writer = new StringWriter();
 
     CompositeNode data = loadCompositeNode(xmlStream);
@@ -59,8 +60,6 @@ public class SerilizationTest {
     _logger.info("Parsed xml [{}]", writer.toString());
   }
 
-  //Note to self:  Stolen from TestUtils
-  ///Users/alefan/odl/controller4/opendaylight/md-sal/sal-rest-connector/src/test/java/org/opendaylight/controller/sal/restconf/impl/test/TestUtils.java
   // Figure out how to include TestUtils through pom ...was getting errors
   private CompositeNode loadCompositeNode(InputStream xmlInputStream) throws FileNotFoundException {
     if (xmlInputStream == null) {
index 20cf4f636230952065b2770bbe189f17f1b9db4d..a68ee574f339526fa2c7a3f5b06e23a78ea13700 100644 (file)
@@ -78,6 +78,41 @@ public class MessagingUtil {
     };
   }
 
+  public static Runnable sendAMessage(final ZMQ.Context context, final String serverAddress, final Message msg)
+      throws IOException, ClassNotFoundException, InterruptedException {
+
+    return new Runnable() {
+      @Override
+      public void run() {
+        final ZMQ.Socket socket = context.socket(ZMQ.REQ);
+        try {
+
+          socket.connect(serverAddress);
+          System.out.println(Thread.currentThread().getName() + " Sending message");
+          try {
+            socket.send(Message.serialize(msg));
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+          byte[] bytes = socket.recv();
+          Message response = null;
+          try {
+            response = (Message) Message.deserialize(bytes);
+          } catch (IOException e) {
+            e.printStackTrace();
+          } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+          }
+          System.out.println(Thread.currentThread().getName() + " Got response " + response);
+        } catch (Exception x) {
+          x.printStackTrace();
+        } finally {
+          socket.close();
+        }
+      }
+    };
+  }
+
   public static Runnable sendAnEmptyMessage(final ZMQ.Context context, final String serverAddress)
           throws IOException, ClassNotFoundException, InterruptedException {
 
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/RemoteServerTestClient.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/RemoteServerTestClient.java
new file mode 100644 (file)
index 0000000..a71ab86
--- /dev/null
@@ -0,0 +1,89 @@
+package org.opendaylight.controller.sal.connector.remoterpc.utils;
+
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.zeromq.ZMQ;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class RemoteServerTestClient {
+
+
+
+  public static void main(String args[]) throws Exception{
+    String serverAddress = "tcp://10.195.128.108:5666";
+    ZMQ.Context ctx = ZMQ.context(1);
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    RemoteServerTestClient client = new RemoteServerTestClient();
+    executor.execute(
+        MessagingUtil.sendAMessage(ctx, serverAddress, client.createPingMessage(serverAddress))
+    );
+    MessagingUtil.sendAMessage(ctx, serverAddress, client.createPingMessage(serverAddress));
+
+    Thread.sleep(5000);
+    MessagingUtil.closeZmqContext(ctx);
+    executor.shutdown();
+  }
+
+  public Message createPingMessage(String serverAddress){
+    Message ping = new Message.MessageBuilder()
+        .type(Message.MessageType.PING)
+        .sender("localhost:5444")
+        .recipient(serverAddress)
+        .build();
+
+    return ping;
+  }
+  public Message createAddFlowMessage(String serverAddress ){
+
+    RpcRouter.RouteIdentifier routeIdentifier = getAddFlowRpcIdentifier();
+
+    Message addFlow = new Message.MessageBuilder()
+        .type(Message.MessageType.REQUEST)
+        .sender("localhost:5444")
+        .recipient(serverAddress)
+        .route(routeIdentifier)
+        .payload(getAddFlowPayload(1,1))
+        .build();
+
+    return addFlow;
+  }
+
+  private RpcRouter.RouteIdentifier getAddFlowRpcIdentifier(){
+    throw new UnsupportedOperationException();
+  }
+
+  private CompositeNode getAddFlowPayload(int flowId, int tableId){
+    final String xml =
+    "<flow xmlns=\"urn:opendaylight:flow:inventory\">"
+    + "<priority>5</priority>"
+    + "<flow-name>Foo</flow-name>"
+    + "<match>"
+    + "<ethernet-match>"
+    + "<ethernet-type>"
+    + "<type>2048</type>"
+    + "</ethernet-type>"
+    + "</ethernet-match>"
+    + "<ipv4-destination>10.0.10.2/24</ipv4-destination>"
+    + "</match>"
+    + "<id>" + flowId + "</id>"
+    + "<table_id>" + tableId + "</table_id>"
+    + "<instructions>"
+    + "<instruction>"
+    + "<order>0</order>"
+    + "<apply-actions>"
+    + "<action>"
+    + "<order>0</order>"
+    + "<dec-nw-ttl/>"
+    + "</action>"
+    + "</apply-actions>"
+    + "</instruction>"
+    + "</instructions>"
+    + "</flow>";
+
+    return XmlUtils.xmlToCompositeNode(xml);
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/AddFlow.xml b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/AddFlow.xml
new file mode 100644 (file)
index 0000000..b042b8f
--- /dev/null
@@ -0,0 +1,36 @@
+<add-flow xmlns="urn:opendaylight:flow:service">
+  <input>
+    <transaction-uri>BA-7</transaction-uri>
+    <table_id>4</table_id>
+    <priority>5</priority>
+    <node>
+      /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]
+    </node>
+    <match>
+      <ipv4-destination>10.0.10.2/24</ipv4-destination>
+      <ethernet-match>
+        <ethernet-type>
+          <type>2048</type>
+        </ethernet-type>
+      </ethernet-match>
+    </match>
+    <instructions>
+      <instruction>
+        <order>0</order>
+        <apply-actions>
+          <action>
+            <order>0</order>
+            <dec-nw-ttl/>
+          </action>
+        </apply-actions>
+      </instruction>
+    </instructions>
+    <flow-table>
+      /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)table[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]
+    </flow-table>
+    <flow-ref>
+      /(urn:opendaylight:inventory?revision=2013-08-19)nodes/(urn:opendaylight:inventory?revision=2013-08-19)node[{(urn:opendaylight:inventory?revision=2013-08-19)id=openflow:1}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)table[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]/(urn:opendaylight:flow:inventory?revision=2013-08-19)flow[{(urn:opendaylight:flow:inventory?revision=2013-08-19)id=4}]
+    </flow-ref>
+    <flow-name>Foo</flow-name>
+  </input>
+</add-flow>
\ No newline at end of file
index 4e4e867fec637a30c3a0a26eedb128becbda84c7..dacc130831ff18332adff071d400621096e8b48b 100644 (file)
@@ -180,16 +180,22 @@ public class TopologyServiceShim implements IDiscoveryListener,
                     for (String container : containerList) {
                         Map<NodeConnector, Pair<Edge, Set<Property>>> edgePropsMap = edgeMap
                                 .get(container);
-                        Edge edge = edgePropsMap.get(connector).getLeft();
-                        if (edge.getTailNodeConnector().equals(connector)) {
-                            ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
-                                    .get(container);
-                            if (update.type == UpdateType.ADDED) {
-                                topologServiceShimListener
-                                        .edgeOverUtilized(edge);
-                            } else {
-                                topologServiceShimListener
-                                        .edgeUtilBackToNormal(edge);
+                        // the edgePropsMap for a particular container may not have
+                        // the connector.
+                        // so check for null
+                        Pair<Edge, Set<Property>> edgeProp = edgePropsMap.get(connector);
+                        if(edgeProp != null) {
+                            Edge edge = edgeProp.getLeft();
+                            if (edge.getTailNodeConnector().equals(connector)) {
+                                ITopologyServiceShimListener topologServiceShimListener = topologyServiceShimListeners
+                                        .get(container);
+                                if (update.type == UpdateType.ADDED) {
+                                    topologServiceShimListener
+                                    .edgeOverUtilized(edge);
+                                } else {
+                                    topologServiceShimListener
+                                    .edgeUtilBackToNormal(edge);
+                                }
                             }
                         }
                     }