Enhancements to remote rpc server. Using zmq router-dealer bridge to make the server... 86/3686/7
authorAbhishek Kumar <abhishk2@cisco.com>
Thu, 12 Dec 2013 19:50:31 +0000 (11:50 -0800)
committerEd Warnicke <eaw@cisco.com>
Mon, 20 Jan 2014 21:43:24 +0000 (15:43 -0600)
Enhancements to remote rpc client. Using zmq router-dealer bridge to make the client async.
Added configuration for remote rpc in Configuration subsystem
On Server startup reading of routingtable to populate remoterpcs was giving exception - fixed the same
Client was not registered with Server and addRPCImplementation was failing - fixed the same
ServerImpl was not getting registered as listener to RoutingTable hence announcement was not received on remote - fixed the same
Patch 6: Some unit tests were intemittently hanging. ZMQ Test server was not gracefully shutting down.

Change-Id: I6054443b39394f82258522205ccd4be470f597f0
Signed-off-by: Basheeruddin Ahmed <syedbahm@cisco.com>
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
39 files changed:
opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml
opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RoutingTable.java
opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/impl/RoutingTableImpl.java
opendaylight/md-sal/remoterpc-routingtable/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/impl/RoutingTableImplTest.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcRoutingContext.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/pom.xml
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/CapturedMessageHandler.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Client.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandler.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Context.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcClient.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcServer.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RoutingTableProvider.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocket.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/CompositeNodeImpl.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RpcRequestImpl.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/util/XmlUtils.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/scala/org/opendaylight/controller/sal/connector/remoterpc/Client.scala [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImplTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandlerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientTest.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/MockRoutingTable.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocketTest.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImplTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandlerTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManagerTest.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/MessagingUtil.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/logback-test.xml [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/RouterTest.java
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/java/org/opendaylight/controller/tests/zmqrouter/rest/Router.java

index 40bd9f9aeaa4605267e58750d3929bb8b45865c1..63ac8577ac868c30bf4ffda8a5f4c0ba5a30241a 100644 (file)
                         <name>runtime-mapping-singleton</name>
                     </mapping-service>
                 </module>
+                <module>
+                   <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">prefix:remote-zeromq-rpc-server</type> 
+                   <name>remoter</name>
+                   <port xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">5666</port>   59
+                   <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">    60
+                       <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">prefix:dom-broker-osgi-registry</type>
+                       <name>dom-broker</name>
+                   </dom-broker>
+               </module>
             </modules>
             
             <services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
         <capability>urn:opendaylight:yang:extension:yang-ext?module=yang-ext&amp;revision=2013-07-09</capability>
         <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:common?module=opendaylight-md-sal-common&amp;revision=2013-10-28</capability>
         <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:cluster:store?module=odl-sal-dom-clustered-store-cfg&amp;revision=2013-10-28</capability>
+        <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc?module=odl-sal-dom-rpc-remote-cfg&amp;revision=2013-10-28</capability>
     </required-capabilities>
 </snapshot>
 
index 6ec4c2ce01f7ccb675873890a3ff80bfa6102963..2da031e54008cb68c5cf23cddc84b4cd50742da1 100644 (file)
@@ -7,6 +7,7 @@
  */
 package org.opendaylight.controller.sal.connector.remoterpc.api;
 
+import java.util.Map;
 import java.util.Set;
 
 public interface RoutingTable<I,R> {
@@ -65,6 +66,12 @@ public interface RoutingTable<I,R> {
    */
   public Set<R> getRoutes(I routeId);
 
+  /**
+   * Returns all network addresses stored in the table
+   * @return
+   */
+  public Set<Map.Entry> getAllRoutes();
+
   /**
    * Returns only one address from the list of network addresses
    * associated with the route. The algorithm to determine that
index 59292a174ed3ea4cee4f940e6841959631c1fb9d..40c4c6b43625d9c99eecb786a1556762cbdc0085 100644 (file)
@@ -72,24 +72,14 @@ public class RoutingTableImpl<I, R> implements RoutingTable<I, R>, ICacheUpdateA
                 }
                 // lets start a transaction
                 clusterGlobalServices.tbegin();
-                Set<R> routes = new HashSet<R>();
-                routes.add(route);
-                routingTableCache.put(routeId, routes);
+
+                routingTableCache.put(routeId, route);
                 clusterGlobalServices.tcommit();
             } else {
                 throw new DuplicateRouteException(" There is already existing route " + existingRoute);
             }
 
-        } catch (NotSupportedException e) {
-            throw new RoutingTableException("Transaction error - while trying to create route id="
-                    + routeId + "with route" + route, e);
-        } catch (HeuristicRollbackException e) {
-            throw new RoutingTableException("Transaction error - while trying to create route id="
-                    + routeId + "with route" + route, e);
-        } catch (RollbackException e) {
-            throw new RoutingTableException("Transaction error - while trying to create route id="
-                    + routeId + "with route" + route, e);
-        } catch (HeuristicMixedException e) {
+        } catch (NotSupportedException|HeuristicRollbackException|RollbackException|HeuristicMixedException e) {
             throw new RoutingTableException("Transaction error - while trying to create route id="
                     + routeId + "with route" + route, e);
         } catch (javax.transaction.SystemException e) {
@@ -116,16 +106,7 @@ public class RoutingTableImpl<I, R> implements RoutingTable<I, R>, ICacheUpdateA
             routingTableCache.remove(routeId);
             clusterGlobalServices.tcommit();
 
-        } catch (NotSupportedException e) {
-            throw new RoutingTableException("Transaction error - while trying to remove route id="
-                    + routeId, e);
-        } catch (HeuristicRollbackException e) {
-            throw new RoutingTableException("Transaction error - while trying to remove route id="
-                    + routeId, e);
-        } catch (RollbackException e) {
-            throw new RoutingTableException("Transaction error - while trying to remove route id="
-                    + routeId, e);
-        } catch (HeuristicMixedException e) {
+        } catch (NotSupportedException|HeuristicRollbackException|RollbackException|HeuristicMixedException e) {
             throw new RoutingTableException("Transaction error - while trying to remove route id="
                     + routeId, e);
         } catch (javax.transaction.SystemException e) {
@@ -139,10 +120,22 @@ public class RoutingTableImpl<I, R> implements RoutingTable<I, R>, ICacheUpdateA
         // Note: currently works for global routes only wherein there is just single
         // route
         Preconditions.checkNotNull(routeId, "getARoute: routeId cannot be null!");
-        return (Set<R>) routingTableCache.get(routeId);
+        R route = (R)routingTableCache.get(routeId);
+        Set<R>routes = null;
+        if(route !=null){
+           routes = new HashSet<R>();
+           routes.add(route);
+        }
+
+        return routes;
     }
 
-    @Override
+  @Override
+  public Set<Map.Entry> getAllRoutes() {
+    return routingTableCache.entrySet();
+  }
+
+  @Override
     public R getARoute(I routeId) {
         throw new UnsupportedOperationException("Not implemented yet!");
     }
index 2ef251d9a1f0a642020b2b5d9299970bdf429189..50460d4e5ec897892a9a507a1b48244098887c86 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import java.net.URI;
 import java.util.EnumSet;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
@@ -80,8 +81,7 @@ public class RoutingTableImplTest {
 
         rti.addGlobalRoute(routeIdentifier, "172.27.12.1:5000");
 
-        Set<String> globalService = new HashSet<String>();
-        globalService.add("172.27.12.1:5000");
+        String globalService =   "172.27.12.1:5000";
 
         when(concurrentMap.get(routeIdentifier)).thenReturn(globalService);
         ConcurrentMap latestCache = rti.getRoutingTableCache();
@@ -92,9 +92,10 @@ public class RoutingTableImplTest {
 
 
         Assert.assertEquals(servicesGlobal.size(),1);
-
-        Assert.assertEquals(servicesGlobal.iterator().next(),"172.27.12.1:5000");
-
+        Iterator<String> iterator = servicesGlobal.iterator();
+        while(iterator.hasNext()){
+        Assert.assertEquals(iterator.next(),"172.27.12.1:5000");
+        }
 
 
     }
@@ -124,8 +125,7 @@ public class RoutingTableImplTest {
 
         rti.addGlobalRoute(routeIdentifier, "172.27.12.1:5000");
 
-        Set<String> globalService = new HashSet<String>();
-        globalService.add("172.27.12.1:5000");
+        String globalService =   "172.27.12.1:5000";
 
         when(concurrentMap.get(routeIdentifier)).thenReturn(globalService);
         ConcurrentMap latestCache = rti.getRoutingTableCache();
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcRoutingContext.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcRoutingContext.java
new file mode 100644 (file)
index 0000000..1680c19
--- /dev/null
@@ -0,0 +1,18 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.core.api;
+
+import org.opendaylight.yangtools.yang.common.QName;
+
+public interface RpcRoutingContext {
+
+  public QName getContext();
+
+  public QName getRpcType();
+}
index ee9bc8922b08d6d0d045b0883d3e6a824601c489..58a0d3044aa196e5c102011d9ea904a2d7d3d19e 100644 (file)
       <groupId> org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId> ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+      <version>1.0.12</version>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
                         </Import-Package>
                         <Export-Package>
                             org.opendaylight.controller.config.yang.md.sal.remote.rpc,
-                            org.opendaylight.controller.sal.connector.remoterpc,
-                            org.opendaylight.controller.sal.connector.remoterpc.*,
+                            org.opendaylight.controller.sal.connector.remoterpc.util,
+                            org.opendaylight.controller.sal.connector.remoterpc.dto,
+                            org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcClient,
+                            org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcServer,
+                            org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcProvider
                         </Export-Package>
                         <Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
                     </instructions>
index 606f282bd7f10d6ebf04e168ed5bb99e3b37b9e6..11179864293ca663776d6e04e8b44b7b448d25cc 100644 (file)
@@ -9,10 +9,7 @@
 */
 package org.opendaylight.controller.config.yang.md.sal.remote.rpc;
 
-import org.opendaylight.controller.sal.connector.remoterpc.Client;
-import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcProvider;
-import org.opendaylight.controller.sal.connector.remoterpc.RoutingTableProvider;
-import org.opendaylight.controller.sal.connector.remoterpc.ServerImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.*;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.osgi.framework.BundleContext;
 
@@ -44,14 +41,18 @@ public final class ZeroMQServerModule extends org.opendaylight.controller.config
     public java.lang.AutoCloseable createInstance() {
         
         Broker broker = getDomBrokerDependency();
-        RoutingTableProvider provider = new RoutingTableProvider(bundleContext);
-        
+
+
         
         final int port = getPort() != null ? getPort() : ZEROMQ_ROUTER_PORT;
 
         ServerImpl serverImpl = new ServerImpl(port);
         
-        Client clientImpl = new Client();
+        ClientImpl clientImpl = new ClientImpl();
+
+        RoutingTableProvider provider = new RoutingTableProvider(bundleContext,serverImpl);
+
+
         RemoteRpcProvider facade = new RemoteRpcProvider(serverImpl, clientImpl);
         
         facade.setRoutingTableProvider(provider );
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/CapturedMessageHandler.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/CapturedMessageHandler.java
new file mode 100644 (file)
index 0000000..2dc5eee
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+public class CapturedMessageHandler implements Runnable {
+
+  private Logger _logger = LoggerFactory.getLogger(CapturedMessageHandler.class);
+
+  private ZMQ.Socket socket;
+
+  public CapturedMessageHandler(ZMQ.Socket socket){
+    this.socket = socket;
+  }
+
+  @Override
+  public void run(){
+
+    try {
+      while (!Thread.currentThread().isInterrupted()){
+        String message = socket.recvStr();
+        _logger.debug("Captured [{}]", message);
+      }
+    } catch (Exception e) {
+      _logger.error("Exception raised [{}]", e.getMessage());
+    }
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Client.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Client.java
deleted file mode 100644 (file)
index ef31623..0000000
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import com.google.common.base.Optional;
-
-import org.opendaylight.controller.sal.common.util.RpcErrors;
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
-import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.*;
-
-import static com.google.common.base.Preconditions.*;
-
-/**
- * An implementation of {@link RpcImplementation} that makes remote RPC calls
- */
-public class Client implements RemoteRpcClient {
-
-    private final Logger _logger = LoggerFactory.getLogger(Client.class);
-
-    private final LinkedBlockingQueue<MessageWrapper> requestQueue = new LinkedBlockingQueue<MessageWrapper>(100);
-
-    private final ExecutorService pool = Executors.newSingleThreadExecutor();
-    private final long TIMEOUT = 5000; // in ms
-
-    private  RoutingTableProvider routingTableProvider;
-
-    public RoutingTableProvider getRoutingTableProvider() {
-        return routingTableProvider;
-    }
-
-    public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
-        this.routingTableProvider = routingTableProvider;
-    }
-
-    public LinkedBlockingQueue<MessageWrapper> getRequestQueue() {
-        return requestQueue;
-    }
-
-    @Override
-    public Set<QName> getSupportedRpcs() {
-        // TODO: Find the entries from routing table
-        return Collections.emptySet();
-    }
-
-    public void start() {
-        pool.execute(new Sender(this));
-
-    }
-
-    public void stop() {
-
-        _logger.debug("Client stopping...");
-        Context.getInstance().getZmqContext().term();
-        _logger.debug("ZMQ context terminated");
-
-        pool.shutdown(); // intiate shutdown
-        try {
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow();
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    _logger.error("Client thread pool did not shut down");
-            }
-        } catch (InterruptedException e) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-        _logger.debug("Client stopped");
-    }
-
-    @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
-
-        RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-        routeId.setType(rpc);
-
-        String address = lookupRemoteAddress(routeId);
-
-        Message request = new Message.MessageBuilder().type(Message.MessageType.REQUEST)
-                .sender(Context.getInstance().getLocalUri()).recipient(address).route(routeId)
-                .payload(XmlUtils.compositeNodeToXml(input)).build();
-
-        List<RpcError> errors = new ArrayList<RpcError>();
-
-        try (SocketPair pair = new SocketPair()) {
-
-            MessageWrapper messageWrapper = new MessageWrapper(request, pair.getSender());
-            process(messageWrapper);
-            Message response = parseMessage(pair.getReceiver());
-
-            CompositeNode payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
-
-            return Rpcs.getRpcResult(true, payload, errors);
-
-        } catch (Exception e) {
-            collectErrors(e, errors);
-            return Rpcs.getRpcResult(false, null, errors);
-        }
-
-    }
-
-    public void process(MessageWrapper msg) throws TimeoutException, InterruptedException {
-        _logger.debug("Processing message [{}]", msg);
-
-        boolean success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS);
-        if (!success)
-            throw new TimeoutException("Queue is full");
-    }
-
-    /**
-     * Block on socket for reply
-     * 
-     * @param receiver
-     * @return
-     */
-    private Message parseMessage(ZMQ.Socket receiver) throws IOException, ClassNotFoundException {
-        return (Message) Message.deserialize(receiver.recv());
-    }
-
-    /**
-     * Find address for the given route identifier in routing table
-     * 
-     * @param routeId
-     *            route identifier
-     * @return remote network address
-     */
-    private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId) {
-        checkNotNull(routeId, "route must not be null");
-
-        Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
-        checkNotNull(routingTable.isPresent(), "Routing table is null");
-
-        Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
-        checkNotNull(addresses, "Address not found for route [%s]", routeId);
-        checkState(addresses.size() == 1, "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); // its
-                                                                                                                         // a
-                                                                                                                         // global
-                                                                                                                         // service.
-
-        String address = addresses.iterator().next();
-        checkNotNull(address, "Address not found for route [%s]", routeId);
-
-        return address;
-    }
-
-    private void collectErrors(Exception e, List<RpcError> errors) {
-        if (e == null)
-            return;
-        if (errors == null)
-            errors = new ArrayList<RpcError>();
-
-        errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
-        for (Throwable t : e.getSuppressed()) {
-            errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
-        }
-    }
-
-    @Override
-    public void close() throws Exception {
-        stop();
-    }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java
new file mode 100644 (file)
index 0000000..291fe0b
--- /dev/null
@@ -0,0 +1,194 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * An implementation of {@link RpcImplementation} that makes
+ * remote RPC calls
+ */
+public class ClientImpl implements RemoteRpcClient {
+
+  private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
+
+  private ZMQ.Context context = ZMQ.context(1);
+  private ClientRequestHandler handler;
+  private RoutingTableProvider routingTableProvider;
+
+  public ClientImpl(){
+    handler = new ClientRequestHandler(context);
+    start();
+  }
+
+  public ClientImpl(ClientRequestHandler handler){
+    this.handler = handler;
+    start();
+  }
+
+  public RoutingTableProvider getRoutingTableProvider() {
+    return routingTableProvider;
+  }
+
+  public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
+    this.routingTableProvider = routingTableProvider;
+  }
+
+ @Override
+  public Set<QName> getSupportedRpcs(){
+    //TODO: Find the entries from routing table
+    return Collections.emptySet();
+  }
+
+  @Override
+  public void start() {/*NOOPS*/}
+
+  @Override
+  public void stop() {
+    closeZmqContext();
+    handler.close();
+    _logger.info("Stopped");
+  }
+
+  @Override
+  public void close(){
+    stop();
+  }
+
+  /**
+   * Finds remote server that can execute this rpc and sends a message to it
+   * requesting execution.
+   * The call blocks until a response from remote server is received. Its upto
+   * the client of this API to implement a timeout functionality.
+   *
+   * @param rpc   remote service to be executed
+   * @param input payload for the remote service
+   * @return
+   */
+  @Override
+  public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+
+    RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+    routeId.setType(rpc);
+
+    String address = lookupRemoteAddress(routeId);
+
+    Message request = new Message.MessageBuilder()
+        .type(Message.MessageType.REQUEST)
+        .sender(Context.getInstance().getLocalUri())
+        .recipient(address)
+        .route(routeId)
+        .payload(XmlUtils.compositeNodeToXml(input))
+        .build();
+
+    List<RpcError> errors = new ArrayList<RpcError>();
+
+    try{
+      Message response = handler.handle(request);
+      CompositeNode payload = null;
+
+      if ( response != null )
+        payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
+
+      return Rpcs.getRpcResult(true, payload, errors);
+
+    } catch (Exception e){
+      collectErrors(e, errors);
+      return Rpcs.getRpcResult(false, null, errors);
+    }
+
+  }
+
+  /**
+   * Find address for the given route identifier in routing table
+   * @param  routeId route identifier
+   * @return         remote network address
+   */
+  private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId){
+    checkNotNull(routeId, "route must not be null");
+
+    Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
+    checkNotNull(routingTable.isPresent(), "Routing table is null");
+
+    Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
+    checkNotNull(addresses, "Address not found for route [%s]", routeId);
+    checkState(addresses.size() == 1,
+        "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); //its a global service.
+
+    String address = addresses.iterator().next();
+    checkNotNull(address, "Address not found for route [%s]", routeId);
+
+    return address;
+  }
+
+  private void collectErrors(Exception e, List<RpcError> errors){
+    if (e == null) return;
+    if (errors == null) errors = new ArrayList<RpcError>();
+
+    errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
+    for (Throwable t : e.getSuppressed()) {
+      errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
+    }
+  }
+
+  /**
+   * Closes ZMQ Context. It tries to gracefully terminate the context. If
+   * termination takes more than a second, its forcefully shutdown.
+   */
+  private void closeZmqContext() {
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    FutureTask zmqTermination = new FutureTask(new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          if (context != null)
+            context.term();
+          _logger.debug("ZMQ Context terminated");
+        } catch (Exception e) {
+          _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
+        }
+      }
+    }, null);
+
+    exec.execute(zmqTermination);
+
+    try {
+      zmqTermination.get(1L, TimeUnit.SECONDS);
+    } catch (Exception e) {/*ignore and continue with shutdown*/}
+
+    exec.shutdownNow();
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandler.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandler.java
new file mode 100644 (file)
index 0000000..f3ef4b6
--- /dev/null
@@ -0,0 +1,190 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+class ClientRequestHandler implements AutoCloseable{
+
+  private Logger _logger = LoggerFactory.getLogger(ClientRequestHandler.class);
+  private final String DEFAULT_NAME = "remoterpc-client-worker";
+  private final String INPROC_PROTOCOL_PREFIX = "inproc://";
+  private final String TCP_PROTOCOL_PREFIX = "tcp://";
+
+  private ZMQ.Context context;
+
+  /*
+   * Worker thread pool. Each thread runs a ROUTER-DEALER pair
+   */
+  private ExecutorService workerPool;
+
+  /*
+   * Set of remote servers this client is currently connected to
+   */
+  private Map<String, String> connectedServers;
+
+  protected ClientRequestHandler(ZMQ.Context context) {
+    this.context = context;
+    connectedServers = new ConcurrentHashMap<String, String>();
+    start();
+  }
+
+  /**
+   * Starts a pool of worker as needed. A worker thread that has not been used for 5 min
+   * is terminated and removed from the pool. If thread dies due to an exception, its
+   * restarted.
+   */
+  private void start(){
+
+    workerPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+        5L, TimeUnit.MINUTES,
+        new SynchronousQueue<Runnable>()){
+
+      @Override
+      protected void afterExecute(Runnable r, Throwable t) {
+        if (isTerminating() || isTerminated() || isShutdown())
+          return;
+
+        Worker worker = (Worker) r;
+        Preconditions.checkState( worker != null );
+        String remoteServerAddress = worker.getRemoteServerAddress();
+        connectedServers.remove(remoteServerAddress);
+
+        if ( t != null ){
+          _logger.debug("Exception caught while terminating worker [{},{}]. " +
+              "Restarting worker...", t.getClass(), t.getMessage());
+
+          connectedServers.put(remoteServerAddress, remoteServerAddress);
+          this.execute(r);
+        }
+        super.afterExecute(r, null);
+      }
+    };
+  }
+
+  public Message handle(Message request) throws IOException, ClassNotFoundException, InterruptedException {
+
+    String remoteServerAddress = request.getRecipient();
+    //if we already have router-dealer bridge setup for this address the send request
+    //otherwise first create the bridge and then send request
+    if ( connectedServers.containsKey(remoteServerAddress) )
+      return sendMessage(request, remoteServerAddress);
+    else{
+      workerPool.execute(new Worker(remoteServerAddress));
+      connectedServers.put(remoteServerAddress, remoteServerAddress);
+      //give little time for sockets to get initialized
+      //TODO: Add socket ping-pong message to ensure socket init rather than thread.sleep.
+      Thread.sleep(1000);
+      return sendMessage(request, remoteServerAddress);
+    }
+  }
+
+  private Message sendMessage(Message request, String address) throws IOException, ClassNotFoundException {
+    Message response = null;
+    ZMQ.Socket socket = context.socket(ZMQ.REQ);
+
+    try {
+      socket.connect( INPROC_PROTOCOL_PREFIX + address);
+      socket.send(Message.serialize(request));
+      _logger.debug("Request sent. Waiting for reply...");
+      byte[] reply = socket.recv(0);
+      _logger.debug("Response received");
+      response = (Message) Message.deserialize(reply);
+    } finally {
+      socket.close();
+    }
+    return response;
+  }
+
+  /**
+   * This gets called automatically if used with try-with-resources
+   */
+  @Override
+  public void close(){
+    workerPool.shutdown();
+    _logger.info("Request Handler closed");
+  }
+
+  /**
+   * Total number of workers in the pool. Number of workers represent
+   * number of remote servers {@link org.opendaylight.controller.sal.connector.remoterpc.ClientImpl} is connected to.
+   *
+   * @return worker count
+   */
+  public int getWorkerCount(){
+
+    if (workerPool == null) return 0;
+
+    return ((ThreadPoolExecutor)workerPool).getActiveCount();
+  }
+  /**
+   * Handles RPC request
+   */
+  private class Worker implements Runnable {
+    private String name;
+    private String remoteServer;  //<servername:rpc-port>
+
+    public Worker(String address){
+      this.name = DEFAULT_NAME + "[" + address + "]";
+      this.remoteServer = address;
+    }
+
+    public String getRemoteServerAddress(){
+      return this.remoteServer;
+    }
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName(name);
+      _logger.debug("Starting ... ");
+
+      ZMQ.Socket router = context.socket(ZMQ.ROUTER);
+      ZMQ.Socket dealer = context.socket(ZMQ.DEALER);
+
+      try {
+        int success = router.bind(INPROC_PROTOCOL_PREFIX + remoteServer);
+        Preconditions.checkState(-1 != success, "Could not bind to " + remoteServer);
+
+        dealer.connect(TCP_PROTOCOL_PREFIX + remoteServer);
+
+        _logger.info("Worker started for [{}]", remoteServer);
+
+        //TODO: Add capture handler
+        //This code will block until the zmq context is terminated.
+        ZMQ.proxy(router, dealer, null);
+
+      } catch (Exception e) {
+        _logger.debug("Ignoring exception [{}, {}]", e.getClass(), e.getMessage());
+      } finally {
+        try {
+          router.close();
+          dealer.close();
+        } catch (Exception x) {
+          _logger.debug("Exception while closing socket [{}]", x);
+        }
+        _logger.debug("Closing...");
+      }
+    }
+  }
+}
index f0bf12cbc08c149011cab9c935153c52342d043a..9e66c20613e89db5cc5825e376b339ef52f9bba7 100644 (file)
@@ -21,6 +21,7 @@ import java.util.Enumeration;
 public class Context {
   private ZMQ.Context zmqContext = ZMQ.context(1);
   private String uri;
+  private final String DEFAULT_RPC_PORT = "5554";
 
   private static Context _instance = new Context();
 
@@ -36,7 +37,7 @@ public class Context {
 
   public String getLocalUri(){
     uri = (uri != null) ? uri
-            : new StringBuilder("tcp://").append(getIpAddress()).append(":")
+            : new StringBuilder().append(getIpAddress()).append(":")
               .append(getRpcPort()).toString();
 
     return uri;
@@ -45,7 +46,7 @@ public class Context {
   public String getRpcPort(){
     String rpcPort = (System.getProperty("rpc.port") != null)
         ? System.getProperty("rpc.port")
-        : "5554";
+        : DEFAULT_RPC_PORT;
 
     return rpcPort;
   }
index 6bd123b7e46e3984f2ac7bbfb5fbf45587e83a02..1f78a6771a0b6deb1f4d208db11ecb2275a04a73 100644 (file)
@@ -1,3 +1,11 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.sal.connector.remoterpc;
 
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
index 3c2e3b0872193b9b64b3e52d8fc2129ba06a43d5..bf205fc38d54a57ebc2ac351a3436f1d0215dbd9 100644 (file)
@@ -1,25 +1,29 @@
-package org.opendaylight.controller.sal.connector.remoterpc;
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
+package org.opendaylight.controller.sal.connector.remoterpc;
 
-import org.opendaylight.controller.sal.connector.remoterpc.Client;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
 
+import java.util.Collection;
+import java.util.Set;
+
 public class RemoteRpcProvider implements 
     RemoteRpcServer,
     RemoteRpcClient,
     Provider {
 
     private final ServerImpl server;
-    private final Client client;
+    private final ClientImpl client;
     private RoutingTableProvider provider;
 
     @Override
@@ -40,7 +44,7 @@ public class RemoteRpcProvider implements
     }
     
     
-    public RemoteRpcProvider(ServerImpl server, Client client) {
+    public RemoteRpcProvider(ServerImpl server, ClientImpl client) {
         this.server = server;
         this.client = client;
     }
@@ -48,22 +52,19 @@ public class RemoteRpcProvider implements
     public void setBrokerSession(ProviderSession session) {
         server.setBrokerSession(session);
     }
-    public void setServerPool(ExecutorService serverPool) {
-        server.setServerPool(serverPool);
-    }
+//    public void setServerPool(ExecutorService serverPool) {
+//        server.setServerPool(serverPool);
+//    }
     public void start() {
-        client.setRoutingTableProvider(provider);
-        server.setRoutingTableProvider(provider);
+        //when listener was being invoked and addRPCImplementation was being
+        //called the client was null.
+        server.setClient(client);
         server.start();
         client.start();
+
+
     }
-    public void onRouteUpdated(String key, Set values) {
-        server.onRouteUpdated(key, values);
-    }
-    public void onRouteDeleted(String key) {
-        server.onRouteDeleted(key);
-    }
-    
+
     
     @Override
     public Collection<ProviderFunctionality> getProviderFunctionality() {
@@ -84,9 +85,6 @@ public class RemoteRpcProvider implements
         client.close();
     }
 
-    
-    
-    
     @Override
     public void stop() {
         server.stop();
index 932600fcb13c64cbc0e95f68d297238ab89fab76..e845e43c2ae13cdbfaf550fdb005c6c3930bb76e 100644 (file)
@@ -1,5 +1,12 @@
-package org.opendaylight.controller.sal.connector.remoterpc;
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
 
+package org.opendaylight.controller.sal.connector.remoterpc;
 
 public interface RemoteRpcServer extends AutoCloseable {
 
index cfdf98638d450830f65b04772a9ea14111430a1c..f62c26e0fdaad757112f4e80e860dcd97a464552 100644 (file)
@@ -1,27 +1,50 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.sal.connector.remoterpc;
 
+import com.google.common.base.Optional;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.impl.RoutingTableImpl;
 import org.osgi.framework.BundleContext;
 import org.osgi.util.tracker.ServiceTracker;
 
-import com.google.common.base.Optional;
-
 public class RoutingTableProvider implements AutoCloseable {
 
     @SuppressWarnings("rawtypes")
     final ServiceTracker<RoutingTable,RoutingTable> tracker;
+
+    private RoutingTableImpl routingTableImpl = null;
+
+    final private RouteChangeListener routeChangeListener;
     
     
-    public RoutingTableProvider(BundleContext ctx) {
+    public RoutingTableProvider(BundleContext ctx,RouteChangeListener rcl) {
         @SuppressWarnings("rawtypes")
         ServiceTracker<RoutingTable, RoutingTable> rawTracker = new ServiceTracker<>(ctx, RoutingTable.class, null);
         tracker = rawTracker;
         tracker.open();
+
+        routeChangeListener = rcl;
     }
     
     public Optional<RoutingTable<String, String>> getRoutingTable() {
         @SuppressWarnings("unchecked")
         RoutingTable<String,String> tracked = tracker.getService();
+
+        if(tracked instanceof RoutingTableImpl){
+            if(routingTableImpl != tracked){
+             routingTableImpl= (RoutingTableImpl)tracked;
+             routingTableImpl.setRouteChangeListener(routeChangeListener);
+            }
+        }
+
         return Optional.fromNullable(tracked);
     }
 
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocket.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocket.java
deleted file mode 100644 (file)
index 7e8590a..0000000
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-
-import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * A class encapsulating {@link ZMQ.Socket} of type {@link ZMQ.REQ}.
- * It adds following capabilities:
- * <li> Retry logic - Tries 3 times before giving up
- * <li> Request times out after {@link TIMEOUT} property
- * <li> The limitation of {@link ZMQ.REQ}/{@link ZMQ.REP} pair is that no 2 requests can be sent before
- * the response for the 1st request is received. To overcome that, this socket queues all messages until
- * the previous request has been responded.
- */
-public class RpcSocket {
-
-  // Constants
-  public static final int TIMEOUT = 2000;
-  public static final int QUEUE_SIZE = 10;
-  public static final int NUM_RETRIES = 3;
-  private static final Logger log = LoggerFactory.getLogger(RpcSocket.class);
-
-  private ZMQ.Socket socket;
-  private ZMQ.Poller poller;
-  private String address;
-  private SocketState state;
-  private long sendTime;
-  private int retriesLeft;
-  private LinkedBlockingQueue<MessageWrapper> inQueue;
-
-
-  public RpcSocket(String address, ZMQ.Poller poller) {
-    this.socket = null;
-    this.state = new IdleSocketState();
-    this.sendTime = -1;
-    this.retriesLeft = NUM_RETRIES;
-    this.inQueue = new LinkedBlockingQueue<MessageWrapper>(QUEUE_SIZE);
-    this.address = address;
-    this.poller = poller;
-    createSocket();
-  }
-
-  public ZMQ.Socket getSocket() {
-    return socket;
-  }
-
-  public String getAddress() {
-    return address;
-  }
-
-  public int getRetriesLeft() {
-    return retriesLeft;
-  }
-
-  public void setRetriesLeft(int retriesLeft) {
-    this.retriesLeft = retriesLeft;
-  }
-
-  public SocketState getState() {
-    return state;
-  }
-
-  public void setState(SocketState state) {
-    this.state = state;
-  }
-
-  public int getQueueSize() {
-    return inQueue.size();
-  }
-
-  public MessageWrapper removeCurrentRequest() {
-    return inQueue.poll();
-  }
-
-  public boolean hasTimedOut() {
-    return (System.currentTimeMillis() - sendTime > RpcSocket.TIMEOUT);
-  }
-
-  public void send(MessageWrapper request) throws TimeoutException {
-    try {
-      boolean success = inQueue.offer(request, TIMEOUT, TimeUnit.MILLISECONDS);    
-      if (!success) {
-        throw new TimeoutException("send :: Queue is full");
-      }
-      process();
-    }
-    catch (InterruptedException e) {
-      log.error("send : Thread interrupted while attempting to add request to inQueue", e);
-    }
-  }
-  
-  public MessageWrapper receive() {
-    Message response = parseMessage();
-    MessageWrapper messageWrapper = inQueue.poll(); //remove the message from queue
-    MessageWrapper responseMessageWrapper = new MessageWrapper(response, messageWrapper.getReceiveSocket());
-
-    state = new IdleSocketState();
-    retriesLeft = NUM_RETRIES;
-    return responseMessageWrapper;
-  }
-  
-  public void process() {
-    if (getQueueSize() > 0) //process if there's message in the queue
-      state.process(this);
-  }
-
-  // Called by IdleSocketState & BusySocketState
-  public void sendMessage() {
-    //Get the message from queue without removing it. For retries
-    MessageWrapper messageWrapper = inQueue.peek();
-    if (messageWrapper != null) {
-      Message message = messageWrapper.getMessage();
-      try {
-        socket.send(Message.serialize(message));
-      }
-      catch (IOException e) {
-        log.debug("Message send failed [{}]", message);
-        log.debug("Exception [{}]", e);
-      }
-      sendTime = System.currentTimeMillis();
-    }
-  }
-  
-  public Message parseMessage() {
-    Message parsedMessage = null;
-    byte[] bytes = socket.recv();
-    log.debug("Received bytes:[{}]", bytes.length);
-    try {
-      parsedMessage = (Message)Message.deserialize(bytes);
-    }
-    catch (IOException|ClassNotFoundException e) {
-      log.debug("parseMessage : Deserializing received bytes failed", e);
-    }
-
-    return parsedMessage;
-  }
-
-  public void recycleSocket() {
-    close();
-  }
-
-  public void close() {
-    socket.setLinger(10);
-    socket.close();
-  }
-
-  private void createSocket() {
-    socket = Context.getInstance().getZmqContext().socket(ZMQ.REQ);
-    socket.connect(address);
-    poller.register(socket, ZMQ.Poller.POLLIN);
-    state = new IdleSocketState();
-  }
-
-
-  /**
-   * Represents the state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
-   */
-  public static interface SocketState {
-
-    /* The processing actions to be performed in this state
-     */
-    public void process(RpcSocket socket);
-  }
-
-  /**
-   * Represents the idle state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
-   */
-  public static class IdleSocketState implements SocketState {
-
-    @Override
-    public void process(RpcSocket socket) {
-      socket.sendMessage();
-      socket.setState(new BusySocketState());
-      socket.setRetriesLeft(socket.getRetriesLeft()-1);
-    }
-  }
-
-  /**
-   * Represents the busy state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
-   */
-  public static class BusySocketState implements SocketState {
-
-    private static Logger log = LoggerFactory.getLogger(BusySocketState.class);
-
-    @Override
-    public void process(RpcSocket socket) {
-      if (socket.hasTimedOut()) {
-        if (socket.getRetriesLeft() > 0) {
-          log.debug("process : Request timed out, retrying now...");
-          socket.sendMessage();
-          socket.setRetriesLeft(socket.getRetriesLeft() - 1);
-        }
-        else {
-          // No more retries for current request, so stop processing the current request
-          MessageWrapper message = socket.removeCurrentRequest();
-          if (message != null) {
-            log.error("Unable to process rpc request [{}]", message);
-            socket.setState(new IdleSocketState());
-            socket.setRetriesLeft(NUM_RETRIES);
-          }
-        }
-      }
-      // Else no timeout, so allow processing to continue
-    }
-  }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java
deleted file mode 100644 (file)
index f53d5ad..0000000
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-
-import java.io.IOException;
-import java.util.concurrent.TimeoutException;
-
-import static com.google.common.base.Preconditions.*;
-
-/**
- * Main server thread for sending requests.
- */
-public class Sender implements Runnable{
-
-  private final static Logger _logger = LoggerFactory.getLogger(Sender.class);
-  private final Client client;
-
-
-  
-  
-  public Sender(Client client) {
-    super();
-    this.client = client;
-  }
-
-@Override
-  public void run() {
-    _logger.info("Starting...");
-
-    try (SocketManager socketManager = new SocketManager()){
-      while (!Thread.currentThread().isInterrupted()) {
-
-        //read incoming messages from blocking queue
-        MessageWrapper request = pollForRequest();
-
-        if (request != null) {
-          processRequest(socketManager, request);
-        }
-
-        flushSockets(socketManager);
-        pollForResponse(socketManager);
-        processResponse(socketManager);
-
-      }
-    } catch(Exception t){
-      _logger.error("Exception: [{}]", t);
-      _logger.error("Stopping...");
-    }
-  }
-
-  private void processResponse(SocketManager socketManager) {
-    for (int i = 0; i < socketManager.getPoller().getSize(); i++) {
-      // If any sockets get a response, process it
-      if (socketManager.getPoller().pollin(i)) {
-        Optional<RpcSocket> socket = socketManager.getManagedSocketFor(
-            socketManager.getPoller().getItem(i).getSocket());
-
-        checkState(socket.isPresent(), "Managed socket not found");
-
-        MessageWrapper response = socket.get().receive();
-        _logger.debug("Received rpc response [{}]", response.getMessage());
-
-        //TODO: handle exception and introduce timeout on receiver side
-        try {
-          response.getReceiveSocket().send(Message.serialize(response.getMessage()));
-        } catch (IOException e) {
-          e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-        }
-      }
-    }
-  }
-
-  private void processRequest(SocketManager socketManager, MessageWrapper request) throws TimeoutException {
-
-    if ((request.getMessage() == null) ||
-        (request.getMessage().getRecipient() == null)) {
-      //invalid message. log and drop
-      _logger.error("Invalid request [{}]", request);
-      return;
-    }
-
-    RpcSocket socket =
-        socketManager.getManagedSocket(request.getMessage().getRecipient());
-
-    socket.send(request);
-  }
-
-  private void flushSockets(SocketManager socketManager){
-    for (RpcSocket socket : socketManager.getManagedSockets()){
-      socket.process();
-    }
-  }
-
-  private MessageWrapper pollForRequest(){
-    return client.getRequestQueue().poll();
-  }
-
-  private void pollForResponse(SocketManager socketManager){
-    try{
-      socketManager.getPoller().poll(10); //poll every 10ms
-    }catch (Throwable t) { /*Ignore and continue*/ }
-  }
-}
-
-
-/*
-SCALA
-
-package org.opendaylight.controller.sal.connector.remoterpc
-
-  import org.slf4j.{LoggerFactory, Logger}
-  import scala.collection.JavaConverters._
-  import scala.Some
-  import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, Message}
-*/
-/**
- * Main server thread for sending requests. This does not maintain any state. If the
- * thread dies, it will be restarted
- */
-/*class Sender extends Runnable {
-  private val _logger: Logger = LoggerFactory.getLogger(Sender.this.getClass())
-
-  override def run = {
-    _logger.info("Sender starting...")
-    val socketManager = new SocketManager()
-
-    try {
-      while (!Thread.currentThread().isInterrupted) {
-        //read incoming messages from blocking queue
-        val request: MessageWrapper = Client.requestQueue.poll()
-
-        if (request != null) {
-          if ((request.message != null) &&
-            (request.message.getRecipient != null)) {
-
-            val socket = socketManager.getManagedSocket(request.message.getRecipient)
-            socket.send(request)
-          } else {
-            //invalid message. log and drop
-            _logger.error("Invalid request [{}]", request)
-          }
-        }
-
-        socketManager.getManagedSockets().asScala.map(s => s.process)
-
-        // Poll all sockets for responses every 1 sec
-        poll(socketManager)
-
-        // If any sockets get a response, process it
-        for (i <- 0 until socketManager.poller.getSize) {
-          if (socketManager.poller.pollin(i)) {
-            val socket = socketManager.getManagedSocketFor(socketManager.poller.getItem(i).getSocket)
-
-            socket match {
-              case None => //{
-                _logger.error("Could not find a managed socket for zmq socket")
-                throw new IllegalStateException("Could not find a managed socket for zmq socket")
-                //}
-              case Some(s) => {
-                val response = s.receive()
-                _logger.debug("Received rpc response [{}]", response.message)
-                response.receiveSocket.send(Message.serialize(response.message))
-              }
-            }
-          }
-        }
-
-      }
-    } catch{
-      case e:Exception => {
-        _logger.debug("Sender stopping due to exception")
-        e.printStackTrace()
-      }
-    } finally {
-      socketManager.stop
-    }
-  }
-
-  def poll(socketManager:SocketManager) = {
-    try{
-      socketManager.poller.poll(10)
-    }catch{
-      case t:Throwable => //ignore and continue
-    }
-  }
-}
-
-
-//    def newThread(r: Runnable): Thread = {
-//      val t = new RequestHandler()
-//      t.setUncaughtExceptionHandler(new RequestProcessorExceptionHandler)
-//      t
-//    }
-
-
-
-/**
- * Restarts the request processing server in the event of unforeseen exceptions
- */
-//private class RequestProcessorExceptionHandler extends UncaughtExceptionHandler {
-//  def uncaughtException(t: Thread, e: Throwable) = {
-//    _logger.error("Exception caught during request processing [{}]", e)
-//    _logger.info("Restarting request processor server...")
-//    RequestProcessor.start()
-//  }
index 83b93858cff392c7ab02e2b9cc0499c45b5798dc..b5a67ff0df97f3d110f1842074617468ce836b61 100644 (file)
 package org.opendaylight.controller.sal.connector.remoterpc;
 
 import com.google.common.base.Optional;
-
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Sets;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message.MessageType;
 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
-import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
 import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.zeromq.ZMQ;
 
-import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
- * ZeroMq based implementation of RpcRouter TODO: 1. Make rpc request handling
- * async and non-blocking. Note zmq socket is not thread safe 2. Read properties
- * from config file using existing(?) ODL properties framework
+ * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
+ * so that it gets route change notifications from routing table.
  */
-public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, Set> {
+public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
 
-    private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
+  private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
 
-    private ExecutorService serverPool;
+  private ExecutorService serverPool;
+  protected ServerRequestHandler handler;
 
-    // private RoutingTable<RpcRouter.RouteIdentifier, String> routingTable;
-    private RoutingTableProvider routingTable;
-    private Set<QName> remoteServices;
-    private ProviderSession brokerSession;
-    private ZMQ.Context context;
-    private ZMQ.Socket replySocket;
+  private Set<QName> remoteServices;
+  private ProviderSession brokerSession;
+  private ZMQ.Context context;
 
-    private final RpcListener listener = new RpcListener();
+  private final RpcListener listener = new RpcListener();
 
-    private final String localUri = Context.getInstance().getLocalUri();
+  private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
+  private final int HANDLER_WORKER_COUNT = 2;
+  private final int HWM = 200;//high water mark on sockets
+  private volatile State status = State.STOPPED;
 
-    private final int rpcPort;
+  private String serverAddress;
+  private int port;
 
-    private RpcImplementation client;
+  private ClientImpl client;
 
-    public RpcImplementation getClient() {
-        return client;
-    }
+  private  RoutingTableProvider routingTableProvider;
 
-    public void setClient(RpcImplementation client) {
-        this.client = client;
-    }
+  public static enum State {
+    STARTING, STARTED, STOPPED;
+  }
 
-    // Prevent instantiation
-    public ServerImpl(int rpcPort) {
-        this.rpcPort = rpcPort;
-    }
+  public ServerImpl(int port) {
+    this.port = port;
+    this.serverAddress = new StringBuilder(findIpAddress()).
+                              append(":").
+                              append(port).
+                              toString();
+  }
 
-    public void setBrokerSession(ProviderSession session) {
-        this.brokerSession = session;
-    }
+  public RoutingTableProvider getRoutingTableProvider() {
+    return routingTableProvider;
+  }
 
-    public ExecutorService getServerPool() {
-        return serverPool;
-    }
+  public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
+    this.routingTableProvider = routingTableProvider;
+  }
 
-    public void setServerPool(ExecutorService serverPool) {
-        this.serverPool = serverPool;
-    }
+  public ClientImpl getClient(){
+    return this.client;
+  }
 
-    public void start() {
-        context = ZMQ.context(1);
-        serverPool = Executors.newSingleThreadExecutor();
-        remoteServices = new HashSet<QName>();
+  public void setClient(ClientImpl client) {
+    this.client = client;
+  }
 
-        // Start listening rpc requests
-        serverPool.execute(receive());
+  public State getStatus() {
+    return this.status;
+  }
 
-        brokerSession.addRpcRegistrationListener(listener);
-        // routingTable.registerRouteChangeListener(routeChangeListener);
+  public Optional<ServerRequestHandler> getHandler() {
+    return Optional.fromNullable(this.handler);
+  }
 
-        Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-        for (QName rpc : currentlySupported) {
-            listener.onRpcImplementationAdded(rpc);
-        }
+  public void setBrokerSession(ProviderSession session) {
+    this.brokerSession = session;
+  }
 
-        _logger.debug("RPC Server started [{}]", localUri);
-    }
+  public Optional<ProviderSession> getBrokerSession() {
+    return Optional.fromNullable(this.brokerSession);
+  }
 
-    public void stop() {
-        // TODO: un-subscribe
+  public Optional<ZMQ.Context> getZmqContext() {
+    return Optional.fromNullable(this.context);
+  }
 
-        // if (context != null)
-        // context.term();
-        //
-        // _logger.debug("ZMQ Context is terminated.");
+  public String getServerAddress() {
+    return serverAddress;
+  }
 
-        if (serverPool != null)
-            serverPool.shutdown();
+  public String getHandlerAddress() {
+    return HANDLER_INPROC_ADDRESS;
+  }
 
-        _logger.debug("Thread pool is closed.");
-    }
+  /**
+   *
+   */
+  public void start() {
+    Preconditions.checkState(State.STOPPED == this.getStatus(),
+        "Remote RPC Server is already running");
 
-    private Runnable receive() {
-        return new Runnable() {
-            public void run() {
-
-                // Bind to RPC reply socket
-                replySocket = context.socket(ZMQ.REP);
-                replySocket.bind("tcp://*:" + Context.getInstance().getRpcPort());
-
-                // Poller enables listening on multiple sockets using a single
-                // thread
-                ZMQ.Poller poller = new ZMQ.Poller(1);
-                poller.register(replySocket, ZMQ.Poller.POLLIN);
-                try {
-                    // TODO: Add code to restart the thread after exception
-                    while (!Thread.currentThread().isInterrupted()) {
-
-                        poller.poll();
-
-                        if (poller.pollin(0)) {
-                            handleRpcCall();
-                        }
-                    }
-                } catch (Exception e) {
-                    // log and continue
-                    _logger.error("Unhandled exception [{}]", e);
-                } finally {
-                    poller.unregister(replySocket);
-                    replySocket.close();
-                }
-
-            }
-        };
-    }
+    status = State.STARTING;
+    context = ZMQ.context(1);
+    remoteServices = new HashSet<QName>();//
+    serverPool = Executors.newSingleThreadExecutor();//main server thread
+    serverPool.execute(receive()); // Start listening rpc requests
+    brokerSession.addRpcRegistrationListener(listener);
 
-    /**
-     * @throws InterruptedException
-     * @throws ExecutionException
-     */
-    private void handleRpcCall() {
+    announceLocalRpcs();
 
-        Message request = parseMessage(replySocket);
+    registerRemoteRpcs();
 
-        _logger.debug("Received rpc request [{}]", request);
+    status = State.STARTED;
+    _logger.info("Remote RPC Server started [{}]", getServerAddress());
+  }
 
-        // Call broker to process the message then reply
-        Future<RpcResult<CompositeNode>> rpc = null;
-        RpcResult<CompositeNode> result = null;
-        try {
-            rpc = brokerSession.rpc((QName) request.getRoute().getType(),
-                    XmlUtils.xmlToCompositeNode((String) request.getPayload()));
+  public void stop(){
+    close();
+  }
 
-            result = (rpc != null) ? rpc.get() : null;
+  /**
+   *
+   */
+  @Override
+  public void close() {
 
-        } catch (Exception e) {
-            _logger.debug("Broker threw  [{}]", e);
-        }
+    if (State.STOPPED == this.getStatus()) return; //do nothing
+
+    unregisterLocalRpcs();
 
-        CompositeNode payload = (result != null) ? result.getResult() : null;
+    if (serverPool != null)
+      serverPool.shutdown();
 
-        Message response = new Message.MessageBuilder().type(MessageType.RESPONSE).sender(localUri)
-                .route(request.getRoute()).payload(XmlUtils.compositeNodeToXml(payload)).build();
+    closeZmqContext();
 
-        _logger.debug("Sending rpc response [{}]", response);
+    status = State.STOPPED;
+    _logger.info("Remote RPC Server stopped");
+  }
+
+  /**
+   * Closes ZMQ Context. It tries to gracefully terminate the context. If
+   * termination takes more than a second, its forcefully shutdown.
+   */
+  private void closeZmqContext() {
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    FutureTask zmqTermination = new FutureTask(new Runnable() {
 
+      @Override
+      public void run() {
         try {
-            replySocket.send(Message.serialize(response));
+          if (context != null)
+            context.term();
+          _logger.debug("ZMQ Context terminated gracefully!");
         } catch (Exception e) {
-            _logger.debug("rpc response send failed for message [{}]", response);
-            _logger.debug("{}", e);
+          _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
         }
+      }
+    }, null);
+
+    exec.execute(zmqTermination);
+
+    try {
+      zmqTermination.get(5L, TimeUnit.SECONDS);
+    } catch (Exception e) {/*ignore and continue with shutdown*/}
+
+    exec.shutdownNow();
+  }
+
+  /**
+   * Main listener thread that spawns {@link ServerRequestHandler} as workers.
+   *
+   * @return
+   */
+  private Runnable receive() {
+    return new Runnable() {
+
+      @Override
+      public void run() {
+        Thread.currentThread().setName("remote-rpc-server");
+        _logger.debug("Remote RPC Server main thread starting...");
+
+        //socket clients connect to (frontend)
+        ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
+
+        //socket RequestHandlers connect to (backend)
+        ZMQ.Socket workers = context.socket(ZMQ.DEALER);
+
+        try (SocketPair capturePair = new SocketPair();
+             ServerRequestHandler requestHandler = new ServerRequestHandler(context,
+                 brokerSession,
+                 HANDLER_WORKER_COUNT,
+                 HANDLER_INPROC_ADDRESS,
+                 getServerAddress());) {
+
+          handler = requestHandler;
+          clients.setHWM(HWM);
+          clients.bind("tcp://*:" + port);
+          workers.setHWM(HWM);
+          workers.bind(HANDLER_INPROC_ADDRESS);
+          //start worker threads
+          _logger.debug("Remote RPC Server worker threads starting...");
+          requestHandler.start();
+          //start capture thread
+          // handlerPool.execute(new CaptureHandler(capturePair.getReceiver()));
+          //  Connect work threads to client threads via a queue
+          ZMQ.proxy(clients, workers, null);//capturePair.getSender());
 
+        } catch (Exception e) {
+          _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
+        } finally {
+          if (clients != null) clients.close();
+          if (workers != null) workers.close();
+          _logger.info("Remote RPC Server stopped");
+        }
+      }
+    };
+  }
+
+  /**
+   * Register the remote RPCs from the routing table into broker
+   */
+  private void registerRemoteRpcs(){
+    Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
+
+    Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
+
+    Set<Map.Entry> remoteRoutes =
+            routingTableProvider.getRoutingTable().get().getAllRoutes();
+
+    //filter out all entries that contains local address
+    //we dont want to register local RPCs as remote
+    Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
+      public boolean apply(Map.Entry remoteRoute){
+        return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
+      }
+    };
+
+    //filter the entries created by current node
+    Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
+
+    for (Map.Entry route : filteredRemoteRoutes){
+      onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
+    }
+  }
+
+  /**
+   * Un-Register the local RPCs from the routing table
+   */
+  private void unregisterLocalRpcs(){
+    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+    for (QName rpc : currentlySupported) {
+      listener.onRpcImplementationRemoved(rpc);
+    }
+  }
+
+  /**
+   * Publish all the locally registered RPCs in the routing table
+   */
+  private void announceLocalRpcs(){
+    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+    for (QName rpc : currentlySupported) {
+      listener.onRpcImplementationAdded(rpc);
+    }
+  }
+
+  /**
+   * @param key
+   * @param value
+   */
+  @Override
+  public void onRouteUpdated(String key, String value) {
+    RouteIdentifierImpl rId = new RouteIdentifierImpl();
+    try {
+      _logger.debug("Updating key/value {}-{}", key, value);
+      brokerSession.addRpcImplementation(
+          (QName) rId.fromString(key).getType(), client);
+
+      //TODO: Check with Tony for routed rpc
+      //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
+    } catch (Exception e) {
+      _logger.info("Route update failed {}", e);
     }
+  }
+
+  /**
+   * @param key
+   */
+  @Override
+  public void onRouteDeleted(String key) {
+    //TODO: Broker session needs to be updated to support this
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Finds IPv4 address of the local VM
+   * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
+   * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
+   * Should we use IP or hostname?
+   *
+   * @return
+   */
+  private String findIpAddress() {
+    String hostAddress = null;
+    Enumeration e = null;
+    try {
+      e = NetworkInterface.getNetworkInterfaces();
+    } catch (SocketException e1) {
+      e1.printStackTrace();
+    }
+    while (e.hasMoreElements()) {
 
-    /**
-     * @param socket
-     * @return
-     */
-    private Message parseMessage(ZMQ.Socket socket) {
+      NetworkInterface n = (NetworkInterface) e.nextElement();
 
-        Message msg = null;
-        try {
-            byte[] bytes = socket.recv();
-            _logger.debug("Received bytes:[{}]", bytes.length);
-            msg = (Message) Message.deserialize(bytes);
-        } catch (Throwable t) {
-            t.printStackTrace();
-        }
-        return msg;
+      Enumeration ee = n.getInetAddresses();
+      while (ee.hasMoreElements()) {
+        InetAddress i = (InetAddress) ee.nextElement();
+        if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
+          hostAddress = i.getHostAddress();
+      }
     }
+    return hostAddress;
+
+  }
+
+  /**
+   * Listener for rpc registrations
+   */
+  private class RpcListener implements RpcRegistrationListener {
 
     @Override
-    public void onRouteUpdated(String key, Set values) {
-        RouteIdentifierImpl rId = new RouteIdentifierImpl();
-        try {
-            _logger.debug("Updating key/value {}-{}", key, values);
-            brokerSession.addRpcImplementation((QName) rId.fromString(key).getType(), client);
+    public void onRpcImplementationAdded(QName name) {
 
-        } catch (Exception e) {
-            _logger.info("Route update failed {}", e);
-        }
+      //if the service name exists in the set, this notice
+      //has bounced back from the broker. It should be ignored
+      if (remoteServices.contains(name))
+        return;
+
+      _logger.debug("Adding registration for [{}]", name);
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+      routeId.setType(name);
+
+      RoutingTable<String, String> routingTable = getRoutingTable();
+
+      try {
+        routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
+        _logger.debug("Route added [{}-{}]", name, getServerAddress());
+
+      } catch (RoutingTableException | SystemException e) {
+        //TODO: This can be thrown when route already exists in the table. Broker
+        //needs to handle this.
+        _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
+
+      }
     }
 
     @Override
-    public void onRouteDeleted(String key) {
-        // TODO: Broker session needs to be updated to support this
-        throw new UnsupportedOperationException();
+    public void onRpcImplementationRemoved(QName name) {
+
+      _logger.debug("Removing registration for [{}]", name);
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+      routeId.setType(name);
+
+      RoutingTable<String, String> routingTable = getRoutingTable();
+
+      try {
+        routingTable.removeGlobalRoute(routeId.toString());
+      } catch (RoutingTableException | SystemException e) {
+        _logger.error("Route delete failed {}", e);
+      }
     }
-    
-    /**
-     * Listener for rpc registrations
-     */
-    private class RpcListener implements RpcRegistrationListener {
-
-        
-
-        @Override
-        public void onRpcImplementationAdded(QName name) {
-
-            // if the service name exists in the set, this notice
-            // has bounced back from the broker. It should be ignored
-            if (remoteServices.contains(name))
-                return;
-
-            _logger.debug("Adding registration for [{}]", name);
-            RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-            routeId.setType(name);
-
-            try {
-                routingTable.getRoutingTable().get().addGlobalRoute(routeId.toString(), localUri);
-                _logger.debug("Route added [{}-{}]", name, localUri);
-            } catch (RoutingTableException | SystemException e) {
-                // TODO: This can be thrown when route already exists in the
-                // table. Broker
-                // needs to handle this.
-                _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
-
-            }
-        }
 
-        @Override
-        public void onRpcImplementationRemoved(QName name) {
+    private RoutingTable<String, String> getRoutingTable(){
+      Optional<RoutingTable<String, String>> routingTable =
+          routingTableProvider.getRoutingTable();
 
-            _logger.debug("Removing registration for [{}]", name);
-            RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-            routeId.setType(name);
+      checkNotNull(routingTable.isPresent(), "Routing table is null");
 
-            try {
-                routingTable.getRoutingTable().get().removeGlobalRoute(routeId.toString());
-            } catch (RoutingTableException | SystemException e) {
-                _logger.error("Route delete failed {}", e);
-            }
-        }
+      return routingTable.get();
     }
+  }
+
+  /*
+   * Listener for Route changes in broker. Broker notifies this listener in the event
+   * of any change (add/delete). Listener then updates the routing table.
+   */
+  private class BrokerRouteChangeListener
+      implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
 
     @Override
-    public void close() throws Exception {
-        stop();
-    }
+    public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
 
-    public void setRoutingTableProvider(RoutingTableProvider provider) {
-        this.routingTable = provider;
     }
+  }
 
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java
new file mode 100644 (file)
index 0000000..949e6ee
--- /dev/null
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *
+ */
+public class ServerRequestHandler implements AutoCloseable{
+
+  private Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
+  private final String DEFAULT_NAME = "remote-rpc-worker";
+  private String dealerAddress;
+  private String serverAddress;
+  private int workerCount;
+  private ZMQ.Context context;
+  private Broker.ProviderSession broker;
+
+  private RequestHandlerThreadPool workerPool;
+  private final AtomicInteger threadId = new AtomicInteger();
+
+  public ServerRequestHandler(ZMQ.Context context,
+                              Broker.ProviderSession session,
+                              int workerCount,
+                              String dealerAddress,
+                              String serverAddress) {
+    this.context       = context;
+    this.dealerAddress = dealerAddress;
+    this.serverAddress = serverAddress;
+    this.broker        = session;
+    this.workerCount   = workerCount;
+  }
+
+  public ThreadPoolExecutor getWorkerPool(){
+    return workerPool;
+  }
+
+  public void start(){
+    workerPool = new RequestHandlerThreadPool(
+        workerCount, workerCount,
+        0L, TimeUnit.MILLISECONDS,
+        new LinkedBlockingQueue<Runnable>());
+    //unbound is ok. Task will never be submitted
+
+    for (int i=0;i<workerCount;i++){
+      workerPool.execute(new Worker(threadId.incrementAndGet()));
+    }
+  }
+
+  /**
+   * This gets called automatically if used with try-with-resources
+   * @throws Exception
+   */
+  @Override
+  public void close() throws Exception {
+    if (workerPool != null)
+      workerPool.shutdown();
+    _logger.info("Request Handler closed");
+  }
+
+  /**
+   * Worker to handles RPC request
+   */
+  private class Worker implements Runnable {
+    private String name;
+
+    public Worker(int id){
+      this.name = DEFAULT_NAME + "-" + id;
+    }
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName(name);
+      _logger.debug("Starting... ");
+      ZMQ.Socket socket = null;
+
+      try {
+        socket = context.socket(ZMQ.REP);
+        socket.connect(dealerAddress);
+
+        while (!Thread.currentThread().isInterrupted()) {
+
+          Message request = parseMessage(socket);
+          _logger.debug("Received rpc request [{}]", request);
+
+          if (request != null) {
+            // Call broker to process the message then reply
+            Future<RpcResult<CompositeNode>> rpc = null;
+            RpcResult<CompositeNode> result = null;
+
+            //TODO Call this in a new thread with timeout
+            try {
+              rpc = broker.rpc(
+                  (QName) request.getRoute().getType(),
+                  XmlUtils.xmlToCompositeNode((String) request.getPayload()));
+
+              result = (rpc != null) ? rpc.get() : null;
+
+            } catch (Exception e) {
+              _logger.debug("Broker threw  [{}]", e);
+            }
+
+            CompositeNode payload = (result != null) ? result.getResult() : null;
+
+            Message response = new Message.MessageBuilder()
+                .type(Message.MessageType.RESPONSE)
+                .sender(serverAddress)
+                .route(request.getRoute())
+                .payload(XmlUtils.compositeNodeToXml(payload))
+                .build();
+
+            _logger.debug("Sending rpc response [{}]", response);
+
+            try {
+              socket.send(Message.serialize(response));
+            } catch (Exception e) {
+              _logger.debug("rpc response send failed for message [{}]", response);
+              _logger.debug("{}", e);
+            }
+          }
+        }
+      } catch (Exception e) {
+        printException(e);
+      } finally {
+        closeSocket(socket);
+      }
+    }
+
+    /**
+     * @param socket
+     * @return
+     */
+    private Message parseMessage(ZMQ.Socket socket) throws Exception {
+      byte[] bytes = socket.recv(); //this blocks
+      _logger.debug("Received bytes:[{}]", bytes.length);
+      return (Message) Message.deserialize(bytes);
+    }
+
+    private void printException(Exception e) {
+      try (StringWriter s = new StringWriter();
+           PrintWriter p = new PrintWriter(s)) {
+        e.printStackTrace(p);
+        _logger.debug(s.toString());
+      } catch (IOException e1) {/*Ignore and continue*/ }
+    }
+
+    private void closeSocket(ZMQ.Socket socket) {
+      try {
+        if (socket != null) socket.close();
+      } catch (Exception x) {
+        _logger.debug("Exception while closing socket [{}]", x);
+      } finally {
+        if (socket != null) socket.close();
+      }
+      _logger.debug("Closing...");
+    }
+  }
+
+
+  /**
+   *
+   */
+  public class RequestHandlerThreadPool extends ThreadPoolExecutor{
+
+    public RequestHandlerThreadPool(int corePoolSize,
+                                    int maximumPoolSize,
+                                    long keepAliveTime,
+                                    TimeUnit unit,
+                                    BlockingQueue<Runnable> workQueue) {
+      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+    }
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable t) {
+      if (isTerminating() || isTerminated() || isShutdown())
+        return;
+
+      if ( t != null ){
+        _logger.debug("Exception caught while terminating worker [{},{}]", t.getClass(), t.getMessage());
+      }
+
+      this.execute(new Worker(threadId.incrementAndGet()));
+      super.afterExecute(r, null);
+    }
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java
deleted file mode 100644 (file)
index 588a299..0000000
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import com.google.common.base.Optional;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-import java.net.UnknownHostException;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * Manages creation of {@link RpcSocket} and their registration with {@link ZMQ.Poller}
- */
-public class SocketManager implements AutoCloseable{
-  private static final Logger log = LoggerFactory.getLogger(SocketManager.class);
-
-  /*
-   * RpcSockets mapped by network address its connected to
-   */
-  private ConcurrentHashMap<String, RpcSocket> managedSockets = new ConcurrentHashMap<String, RpcSocket>();
-
-  private ZMQ.Poller _poller = new ZMQ.Poller(2); //randomly selected size. Poller grows automatically
-
-  /**
-   * Returns a {@link RpcSocket} for the given address
-   * @param address network address with port eg: 10.199.199.20:5554
-   * @return
-   */
-  public RpcSocket getManagedSocket(String address) throws IllegalArgumentException {
-    //Precondition
-    if (!address.matches("(tcp://)(.*)(:)(\\d*)")) {
-      throw new IllegalArgumentException("Address must of format 'tcp://<ip address>:<port>' but is " + address);
-    }
-
-    if (!managedSockets.containsKey(address)) {
-      log.debug("{} Creating new socket for {}", Thread.currentThread().getName());
-      RpcSocket socket = new RpcSocket(address, _poller);
-      managedSockets.put(address, socket);
-    }
-
-    return managedSockets.get(address);
-  }
-
-  /**
-   * Returns a {@link RpcSocket} for the given {@link ZMQ.Socket}
-   * @param socket
-   * @return
-   */
-  public Optional<RpcSocket> getManagedSocketFor(ZMQ.Socket socket) {
-    for (RpcSocket rpcSocket : managedSockets.values()) {
-      if (rpcSocket.getSocket().equals(socket)) {
-        return Optional.of(rpcSocket);
-      }
-    }
-    return Optional.absent();
-  }
-
-  /**
-   * Return a collection of all managed sockets
-   * @return
-   */
-  public Collection<RpcSocket> getManagedSockets() {
-    return managedSockets.values();
-  }
-
-  /**
-   * Returns the {@link ZMQ.Poller}
-   * @return
-   */
-  public ZMQ.Poller getPoller() {
-    return _poller;
-  }
-
-  /**
-   * This should be called when stopping the server to close all the sockets
-   * @return
-   */
-  @Override
-  public void close() throws Exception {
-    log.debug("Stopping...");
-    for (RpcSocket socket : managedSockets.values()) {
-      socket.close();
-    }
-    managedSockets.clear();
-    log.debug("Stopped");
-  }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/CompositeNodeImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/CompositeNodeImpl.java
deleted file mode 100644 (file)
index 073601a..0000000
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc.dto;
-
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.*;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class CompositeNodeImpl implements CompositeNode, Serializable {
-
-  private QName key;
-  private List<Node<?>> children;
-
-  @Override
-  public List<Node<?>> getChildren() {
-    return children;
-  }
-
-  @Override
-  public List<CompositeNode> getCompositesByName(QName children) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public List<CompositeNode> getCompositesByName(String children) {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public List<SimpleNode<?>> getSimpleNodesByName(QName children) {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public List<SimpleNode<?>> getSimpleNodesByName(String children) {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public CompositeNode getFirstCompositeByName(QName container) {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public SimpleNode<?> getFirstSimpleByName(QName leaf) {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public MutableCompositeNode asMutable() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public QName getKey() {
-    return key;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public List<Node<?>> setValue(List<Node<?>> value) {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public int size() {
-    return 0;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return false;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public boolean containsKey(Object key) {
-    return false;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public boolean containsValue(Object value) {
-    return false;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public List<Node<?>> get(Object key) {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public List<Node<?>> put(QName key, List<Node<?>> value) {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public List<Node<?>> remove(Object key) {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public void putAll(Map<? extends QName, ? extends List<Node<?>>> m) {
-    //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public void clear() {
-    //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public Set<QName> keySet() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public Collection<List<Node<?>>> values() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public Set<Entry<QName, List<Node<?>>>> entrySet() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public QName getNodeType() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public CompositeNode getParent() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public List<Node<?>> getValue() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-
-  @Override
-  public ModifyAction getModificationAction() {
-    return null;  //To change body of implemented methods use File | Settings | File Templates.
-  }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RpcRequestImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RpcRequestImpl.java
deleted file mode 100644 (file)
index f26f78b..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connector.remoterpc.dto;
-
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-
-import java.io.Serializable;
-
-public class RpcRequestImpl implements RpcRouter.RpcRequest<QName, QName, InstanceIdentifier, Object>,Serializable {
-
-  private RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier;
-  private Object payload;
-
-  @Override
-  public RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> getRoutingInformation() {
-    return routeIdentifier;
-  }
-
-  public void setRouteIdentifier(RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier) {
-    this.routeIdentifier = routeIdentifier;
-  }
-
-  @Override
-  public Object getPayload() {
-    return payload;
-  }
-
-  public void setPayload(Object payload) {
-    this.payload = payload;
-  }
-
-}
index 7dab2e3b758e8af526e02bbae01004bfd75cfa06..3bc39630e18acab85c628538b7bb883bdf8d2372 100644 (file)
@@ -1,3 +1,11 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.sal.connector.remoterpc.util;
 
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/scala/org/opendaylight/controller/sal/connector/remoterpc/Client.scala b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/scala/org/opendaylight/controller/sal/connector/remoterpc/Client.scala
deleted file mode 100644 (file)
index 63b6808..0000000
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc
-
-import org.opendaylight.yangtools.yang.data.api.CompositeNode
-import org.opendaylight.yangtools.yang.common.{RpcError, RpcResult, QName}
-import org.opendaylight.controller.sal.core.api.RpcImplementation
-import java.util
-import java.util.{UUID, Collections}
-import org.zeromq.ZMQ
-import org.opendaylight.controller.sal.common.util.{RpcErrors, Rpcs}
-import org.slf4j.LoggerFactory
-import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, RouteIdentifierImpl, Message}
-import Message.MessageType
-import java.util.concurrent._
-import java.lang.InterruptedException
-
-
-/**
- * An implementation of {@link RpcImplementation} that makes
- * remote RPC calls
- */
-class Client extends RemoteRpcClient {
-
-  private val _logger = LoggerFactory.getLogger(this.getClass);
-
-  val requestQueue = new LinkedBlockingQueue[MessageWrapper](100)
-  val pool: ExecutorService = Executors.newSingleThreadExecutor()
-  private val TIMEOUT = 5000 //in ms
-  var routingTableProvider: RoutingTableProvider = null
-  
-  
-  def getInstance = this
-
-  
-  def setRoutingTableProvider(provider : RoutingTableProvider) = {
-    routingTableProvider = provider;
-  }
-  
-  def getSupportedRpcs: util.Set[QName] = {
-    Collections.emptySet()
-  }
-
-  def invokeRpc(rpc: QName, input: CompositeNode): RpcResult[CompositeNode] = {
-
-    val routeId = new RouteIdentifierImpl()
-    routeId.setType(rpc)
-
-    //lookup address for the rpc request
-    val routingTable = routingTableProvider.getRoutingTable()
-    require( routingTable != null, "Routing table not found. Exiting" )
-
-    val addresses:util.Set[String] = routingTable.getRoutes(routeId.toString)
-    require(addresses != null, "Address not found for rpc " + rpc);
-    require(addresses.size() == 1) //its a global service.
-
-    val address = addresses.iterator().next()
-    require(address != null, "Address is null")
-
-    //create in-process "pair" socket and pass it to sender thread
-    //Sender replies on this when result is available
-    val inProcAddress = "inproc://" + UUID.randomUUID()
-    val receiver = Context.zmqContext.socket(ZMQ.PAIR)
-    receiver.bind(inProcAddress);
-
-    val sender = Context.zmqContext.socket(ZMQ.PAIR)
-    sender.connect(inProcAddress)
-
-    val requestMessage = new Message.MessageBuilder()
-      .`type`(MessageType.REQUEST)
-      //.sender("tcp://localhost:8081")
-      .recipient(address)
-      .route(routeId)
-      .payload(input)
-      .build()
-
-    _logger.debug("Queuing up request and expecting response on [{}]", inProcAddress)
-
-    val messageWrapper = new MessageWrapper(requestMessage, sender)
-    val errors = new util.ArrayList[RpcError]
-
-    try {
-      process(messageWrapper)
-      val response = parseMessage(receiver)
-
-      return Rpcs.getRpcResult(
-        true, response.getPayload.asInstanceOf[CompositeNode], Collections.emptySet())
-
-    } catch {
-      case e: Exception => {
-        errors.add(RpcErrors.getRpcError(null,null,null,null,e.getMessage,null,e.getCause))
-        return Rpcs.getRpcResult(false, null, errors)
-      }
-    } finally {
-      receiver.close();
-      sender.close();
-    }
-
-  }
-
-  /**
-   * Block on socket for reply
-   * @param receiver
-   * @return
-   */
-  private def parseMessage(receiver:ZMQ.Socket): Message = {
-    val bytes = receiver.recv()
-    return  Message.deserialize(bytes).asInstanceOf[Message]
-  }
-
-  def start() = {
-    pool.execute(new Sender)
-  }
-
-  def process(msg: MessageWrapper) = {
-    _logger.debug("Processing message [{}]", msg)
-    val success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS)
-
-    if (!success) throw new TimeoutException("Queue is full");
-
-  }
-
-  def stop() = {
-    pool.shutdown() //intiate shutdown
-    _logger.debug("Client stopping...")
-    //    Context.zmqContext.term();
-    //    _logger.debug("ZMQ context terminated")
-
-    try {
-
-      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-        pool.shutdownNow();
-        if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-          _logger.error("Client thread pool did not shut down");
-      }
-    } catch {
-      case ie:InterruptedException =>
-        // (Re-)Cancel if current thread also interrupted
-        pool.shutdownNow();
-        // Preserve interrupt status
-        Thread.currentThread().interrupt();
-    }
-    _logger.debug("Client stopped")
-  }
-  
-  def close() = {
-    stop();
-  }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImplTest.java
new file mode 100644 (file)
index 0000000..5dd2142
--- /dev/null
@@ -0,0 +1,103 @@
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import junit.framework.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public class ClientImplTest {
+  RoutingTableProvider routingTableProvider;
+  ClientImpl client;
+  ClientRequestHandler mockHandler;
+
+  @Before
+  public void setUp() throws Exception {
+
+    //mock routing table
+    routingTableProvider = mock(RoutingTableProvider.class);
+    RoutingTable<String, String> mockRoutingTable = new MockRoutingTable<String, String>();
+    Optional<RoutingTable<String, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
+    when(routingTableProvider.getRoutingTable()).thenReturn(optionalRoutingTable);
+
+    //mock ClientRequestHandler
+    mockHandler = mock(ClientRequestHandler.class);
+
+    client = new ClientImpl(mockHandler);
+    client.setRoutingTableProvider(routingTableProvider);
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+
+  }
+
+  @Test
+  public void getRoutingTableProvider_Call_ShouldReturnMockProvider() throws Exception {
+    Assert.assertEquals(routingTableProvider, client.getRoutingTableProvider());
+
+  }
+
+  @Test
+  public void testStart() throws Exception {
+
+  }
+
+  @Test
+  public void testStop() throws Exception {
+
+  }
+
+  @Test
+  public void testClose() throws Exception {
+
+  }
+
+  @Test
+  public void invokeRpc_NormalCall_ShouldReturnSuccess() throws Exception {
+
+    when(mockHandler.handle(any(Message.class))).
+            thenReturn(MessagingUtil.createEmptyMessage());
+
+    RpcResult<CompositeNode> result = client.invokeRpc(null, null);
+
+    Assert.assertTrue(result.isSuccessful());
+    Assert.assertTrue(result.getErrors().isEmpty());
+    Assert.assertNull(result.getResult());
+  }
+
+  @Test
+  public void invokeRpc_HandlerThrowsException_ShouldReturnError() throws Exception {
+
+    when(mockHandler.handle(any(Message.class))).
+            thenThrow(new IOException());
+
+    RpcResult<CompositeNode> result = client.invokeRpc(null, null);
+
+    Assert.assertFalse(result.isSuccessful());
+    Assert.assertFalse(result.getErrors().isEmpty());
+    Assert.assertNull(result.getResult());
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandlerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandlerTest.java
new file mode 100644 (file)
index 0000000..e134594
--- /dev/null
@@ -0,0 +1,113 @@
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import junit.framework.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class ClientRequestHandlerTest {
+
+  private final Logger _logger = LoggerFactory.getLogger(ClientRequestHandlerTest.class);
+
+  ZMQ.Context context;
+  ExecutorService serverThread;
+  final String SERVER_ADDRESS = "localhost:5554";
+
+  ClientRequestHandler handler;
+
+  @Before
+  public void setUp() throws Exception {
+    context = ZMQ.context(1);
+    serverThread = Executors.newCachedThreadPool();
+    handler = new ClientRequestHandler(context);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    serverThread.shutdown();
+    MessagingUtil.closeZmqContext(context);
+    handler.close();
+  }
+
+ @Test
+  public void handle_SingleRemote_ShouldReturnResponse() throws Exception {
+    serverThread.execute(MessagingUtil.startReplyServer(context, SERVER_ADDRESS, 1));
+    Message request = new Message();
+    request.setRecipient(SERVER_ADDRESS);
+    Message response = handleMessageWithTimeout(request);
+    Assert.assertNotNull(response);
+    //should be connected to only 1 remote server
+    Assert.assertEquals(1, handler.getWorkerCount());
+    Assert.assertEquals(response.getRecipient(), SERVER_ADDRESS);
+  }
+
+ // @Test
+  public void handle_MultiRemote_ShouldReturnResponses() throws Exception {
+    ExecutorService threadPool = Executors.newCachedThreadPool();
+    final int port = 5555;
+    String serverAddress = null;
+    for (int i = 0; i < 5; i++) {
+      serverAddress = "localhost:" + (port + i);
+      serverThread.execute(MessagingUtil.startReplyServer(context, serverAddress, 1));
+      threadPool.execute(createEmptyMessageTaskAndHandle(handler, serverAddress));
+    }
+    Thread.currentThread().sleep(5000);//wait for all messages to get processed
+    //should be connected to 5 remote server
+    Assert.assertEquals(5, handler.getWorkerCount());
+  }
+
+  private Runnable createEmptyMessageTaskAndHandle(final ClientRequestHandler handler, final String serverAddress) {
+
+    return new Runnable() {
+      @Override
+      public void run() {
+        Message request = new Message();
+        request.setRecipient(serverAddress);
+        try {
+          Message response = handleMessageWithTimeout(request);
+          Assert.assertNotNull(response);
+          Assert.assertEquals(response.getRecipient(), serverAddress);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private Message handleMessageWithTimeout(final Message request) {
+    Message response = null;
+
+    FutureTask task = new FutureTask(new Callable<Message>() {
+
+      @Override
+      public Message call() {
+        try {
+          return handler.handle(request);
+        } catch (Exception e) {
+          _logger.debug("Client handler failed to handle request. Exception is [{}]", e);
+        }
+        return null;
+      }
+    });
+
+    serverThread.execute(task);
+
+    try {
+      response = (Message) task.get(5L, TimeUnit.SECONDS); //wait for max 5 sec for server to respond
+    } catch (Exception e) {/*ignore and continue*/}
+
+    return response;
+  }
+
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientTest.java
deleted file mode 100644 (file)
index 2e77537..0000000
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import junit.framework.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-
-import java.util.concurrent.TimeoutException;
-
-public class ClientTest {
-
-    Client client;
-    
-  @Before
-  public void setup(){
-      client = new Client();
-      client.getRequestQueue().clear();
-  }
-
-  @Test
-  public void testStop() throws Exception {
-
-  }
-
-  @Test
-  public void testPool() throws Exception {
-
-  }
-
-  @Test
-  public void process_AddAMessage_ShouldAddToQueue() throws Exception {
-    client.process(getEmptyMessageWrapper());
-    Assert.assertEquals(1, client.getRequestQueue().size());
-  }
-
-  /**
-   * Queue size is 100. Adding 101 message should time out in 2 sec
-   * if server does not process it
-   * @throws Exception
-   */
-  @Test(expected = TimeoutException.class)
-  public void process_Add101Message_ShouldThrow() throws Exception {
-    for (int i=0;i<101;i++){
-      client.process(getEmptyMessageWrapper());
-    }
-  }
-
-  @Test
-  public void testStart() throws Exception {
-  }
-
-  private MessageWrapper getEmptyMessageWrapper(){
-    return new MessageWrapper(new Message(), null);
-  }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/MockRoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/MockRoutingTable.java
new file mode 100644 (file)
index 0000000..7b0ac2c
--- /dev/null
@@ -0,0 +1,60 @@
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Mock implementation of routing table
+ */
+public class MockRoutingTable<K, V> implements RoutingTable {
+
+
+  @Override
+  public void addRoute(Object o, Object o2) throws RoutingTableException, SystemException {
+
+  }
+
+  @Override
+  public void addGlobalRoute(Object o, Object o2) throws RoutingTableException, SystemException {
+
+  }
+
+  @Override
+  public void removeRoute(Object o, Object o2) {
+
+  }
+
+  @Override
+  public void removeGlobalRoute(Object o) throws RoutingTableException, SystemException {
+
+  }
+
+  @Override
+  public Set getRoutes(Object o) {
+    Set<String> routes = new HashSet<String>();
+    routes.add("localhost:5554");
+    return routes;
+  }
+
+  @Override
+  public Set<Map.Entry> getAllRoutes() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Object getARoute(Object o) {
+    return null;
+  }
+
+  @Override
+  public void registerRouteChangeListener(RouteChangeListener routeChangeListener) {
+
+  }
+}
index f6b9004eae3139a17773688eba17a528fb5d1ae0..9b7cf89dfbfe5944f00788a4342ca508c286d10a 100644 (file)
@@ -1,3 +1,10 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
 package org.opendaylight.controller.sal.connector.remoterpc;
 
 import java.net.URI;
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocketTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocketTest.java
deleted file mode 100644 (file)
index e23a3ca..0000000
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import junit.framework.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.zeromq.ZMQ;
-
-import java.util.concurrent.TimeoutException;
-
-import static org.mockito.Mockito.doNothing;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(RpcSocket.class)
-public class RpcSocketTest {
-  RpcSocket rpcSocket = new RpcSocket("tcp://localhost:5554", new ZMQ.Poller(1));
-  RpcSocket spy = PowerMockito.spy(rpcSocket);
-
-  @Test
-  public void testCreateSocket() throws Exception {
-    Assert.assertEquals("tcp://localhost:5554", spy.getAddress());
-    Assert.assertEquals(ZMQ.REQ, spy.getSocket().getType());
-  }
-
-  @Test(expected = TimeoutException.class)
-  public void send_WhenQueueGetsFull_ShouldThrow() throws Exception {
-
-    doNothing().when(spy).process();
-
-    //10 is queue size
-    for (int i=0;i<10;i++){
-      spy.send(getEmptyMessageWrapper());
-    }
-
-    //sending 11th message should throw
-    spy.send(getEmptyMessageWrapper());
-  }
-
-  @Test
-  public void testHasTimedOut() throws Exception {
-    spy.send(getEmptyMessageWrapper());
-    Assert.assertFalse(spy.hasTimedOut());
-    Thread.sleep(1000);
-    Assert.assertFalse(spy.hasTimedOut());
-    Thread.sleep(1000);
-    Assert.assertTrue(spy.hasTimedOut());
-  }
-
-  @Test
-  public void testProcess() throws Exception {
-    PowerMockito.doNothing().when(spy, "sendMessage");
-    spy.send(getEmptyMessageWrapper());
-
-    //Next message should get queued
-    spy.send(getEmptyMessageWrapper());
-
-    //queue size should be 2
-    Assert.assertEquals(2, spy.getQueueSize());
-
-
-    spy.process();
-    //sleep for 2 secs (timeout)
-    //message send would be retried
-    Thread.sleep(2000);
-    spy.process();
-    Thread.sleep(2000);
-    spy.process();
-    Thread.sleep(2000);
-    spy.process(); //retry fails, next message will get picked up
-    Assert.assertEquals(1, spy.getQueueSize());
-  }
-
-  @Test
-  public void testProcessStateTransitions() throws Exception {
-    PowerMockito.doNothing().when(spy, "sendMessage");
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
-    spy.send(getEmptyMessageWrapper());
-    Assert.assertEquals(1, spy.getQueueSize());
-    Thread.sleep(200);
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
-    Thread.sleep(1800);
-
-    //1st timeout, 2nd try
-    spy.process();
-    Thread.sleep(200);
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
-    Thread.sleep(1800);
-
-    //2nd timeout, 3rd try
-    spy.process();
-    Thread.sleep(200);
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
-    Thread.sleep(1800);
-
-    //3rd timeout, no more tries => remove
-    spy.process();
-    Thread.sleep(200);
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
-    Assert.assertEquals(0, spy.getQueueSize());
-  }
-
-  @Test
-  public void testParseMessage() throws Exception {
-    // Write an integration test for parseMessage
-  }
-
-  @Test
-  public void testRecycleSocket() throws Exception {
-    // This will need to be updated in the future...for now, recycleSocket() calls close()
-    Assert.assertTrue(spy.getSocket().base().check_tag());
-    spy.close();
-    Assert.assertEquals(10, spy.getSocket().getLinger());
-    Assert.assertFalse(spy.getSocket().base().check_tag());
-  }
-
-  @Test
-  public void testClose() throws Exception {
-    Assert.assertTrue(spy.getSocket().base().check_tag());
-    spy.close();
-    Assert.assertEquals(10, spy.getSocket().getLinger());
-    Assert.assertFalse(spy.getSocket().base().check_tag());
-  }
-
-  @Test
-  public void testReceive() throws Exception {
-    PowerMockito.doReturn(null).when(spy, "parseMessage");
-    PowerMockito.doNothing().when(spy, "process");
-    spy.send(getEmptyMessageWrapper());
-
-    //There should be 1 message waiting in the queue
-    Assert.assertEquals(1, spy.getQueueSize());
-
-    spy.receive();
-    //This should complete message processing
-    //The message should be removed from the queue
-    Assert.assertEquals(0, spy.getQueueSize());
-    Assert.assertEquals(RpcSocket.NUM_RETRIES, spy.getRetriesLeft());
-
-  }
-
-  @Test
-  public void testReceiveStateTransitions() throws Exception {
-    PowerMockito.doReturn(null).when(spy, "parseMessage");
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
-    spy.send(getEmptyMessageWrapper());
-
-    //There should be 1 message waiting in the queue
-    Assert.assertEquals(1, spy.getQueueSize());
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
-
-    spy.receive();
-    //This should complete message processing
-    //The message should be removed from the queue
-    Assert.assertEquals(0, spy.getQueueSize());
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
-  }
-
-  private MessageWrapper getEmptyMessageWrapper(){
-    return new MessageWrapper(new Message(), null);
-  }
-
-  @Test
-  public void testProcessReceiveSequence() throws Exception {
-    PowerMockito.doNothing().when(spy, "sendMessage");
-    PowerMockito.doReturn(null).when(spy, "parseMessage");
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
-    spy.send(getEmptyMessageWrapper());
-    spy.send(getEmptyMessageWrapper());
-    Assert.assertEquals(2, spy.getQueueSize());
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
-
-
-    Thread.sleep(2000);
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
-    spy.receive();
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
-    Assert.assertEquals(1, spy.getQueueSize());
-
-    spy.process();
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
-    spy.receive();
-    Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
-    Assert.assertEquals(0, spy.getQueueSize());
-  }
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImplTest.java
new file mode 100644 (file)
index 0000000..1b282f6
--- /dev/null
@@ -0,0 +1,194 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+
+import com.google.common.base.Optional;
+import junit.framework.Assert;
+import org.junit.*;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.zeromq.ZMQ;
+import zmq.Ctx;
+import zmq.SocketBase;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ServerImplTest {
+
+  private static ZMQ.Context context;
+  private ServerImpl server;
+  private Broker.ProviderSession brokerSession;
+  private RoutingTableProvider routingTableProvider;
+  private RpcRegistrationListener listener;
+
+  ExecutorService pool;
+
+  //Server configuration
+  private final int HANDLER_COUNT = 2;
+  private final int HWM = 200;
+  private final int port = 5554;
+  //server address
+  private final String SERVER_ADDRESS = "tcp://localhost:5554";
+
+  //@BeforeClass
+  public static void init() {
+    context = ZMQ.context(1);
+  }
+
+  //@AfterClass
+  public static void destroy() {
+    MessagingUtil.closeZmqContext(context);
+  }
+
+  @Before
+  public void setup() throws InterruptedException {
+    context = ZMQ.context(1);
+    brokerSession = mock(Broker.ProviderSession.class);
+    routingTableProvider = mock(RoutingTableProvider.class);
+    listener = mock(RpcRegistrationListener.class);
+
+    server = new ServerImpl(port);
+    server.setBrokerSession(brokerSession);
+    server.setRoutingTableProvider(routingTableProvider);
+
+    RoutingTable<String, String> mockRoutingTable = new MockRoutingTable<String, String>();
+    Optional<RoutingTable<String, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
+    when(routingTableProvider.getRoutingTable()).thenReturn(optionalRoutingTable);
+
+    when(brokerSession.addRpcRegistrationListener(listener)).thenReturn(null);
+    when(brokerSession.getSupportedRpcs()).thenReturn(Collections.EMPTY_SET);
+    when(brokerSession.rpc(null, mock(CompositeNode.class))).thenReturn(null);
+    server.start();
+    Thread.sleep(5000);//wait for server to start
+  }
+
+  @After
+  public void tearDown() throws InterruptedException {
+
+    if (pool != null)
+      pool.shutdown();
+
+    if (server != null)
+      server.stop();
+
+    MessagingUtil.closeZmqContext(context);
+
+    Thread.sleep(5000);//wait for server to stop
+    Assert.assertEquals(ServerImpl.State.STOPPED, server.getStatus());
+  }
+
+  @Test
+  public void getRoutingTableProvider_Call_ShouldReturnRoutingTable() throws Exception {
+    Assert.assertNotNull(server.getRoutingTableProvider());
+  }
+
+  @Test
+  public void getBrokerSession_Call_ShouldReturnBrokerSession() throws Exception {
+    Optional<Broker.ProviderSession> mayBeBroker = server.getBrokerSession();
+
+    if (mayBeBroker.isPresent())
+      Assert.assertEquals(brokerSession, mayBeBroker.get());
+    else
+      Assert.fail("Broker does not exist in Remote RPC Server");
+
+  }
+
+  @Test
+  public void start_Call_ShouldSetServerStatusToStarted() throws Exception {
+    Assert.assertEquals(ServerImpl.State.STARTED, server.getStatus());
+
+  }
+
+  @Test
+  public void start_Call_ShouldCreateNZmqSockets() throws Exception {
+    final int EXPECTED_COUNT = 2 + HANDLER_COUNT; //1 ROUTER + 1 DEALER + HANDLER_COUNT
+
+    Optional<ZMQ.Context> mayBeContext = server.getZmqContext();
+    if (mayBeContext.isPresent())
+      Assert.assertEquals(EXPECTED_COUNT, findSocketCount(mayBeContext.get()));
+    else
+      Assert.fail("ZMQ Context does not exist in Remote RPC Server");
+  }
+
+  @Test
+  public void start_Call_ShouldCreate1ServerThread() {
+    final String SERVER_THREAD_NAME = "remote-rpc-server";
+    final int EXPECTED_COUNT = 1;
+    List<Thread> serverThreads = findThreadsWithName(SERVER_THREAD_NAME);
+    Assert.assertEquals(EXPECTED_COUNT, serverThreads.size());
+  }
+
+  @Test
+  public void start_Call_ShouldCreateNHandlerThreads() {
+    //final String WORKER_THREAD_NAME = "remote-rpc-worker";
+    final int EXPECTED_COUNT = HANDLER_COUNT;
+
+    Optional<ServerRequestHandler> serverRequestHandlerOptional = server.getHandler();
+    if (serverRequestHandlerOptional.isPresent()){
+      ServerRequestHandler handler = serverRequestHandlerOptional.get();
+      ThreadPoolExecutor workerPool = handler.getWorkerPool();
+      Assert.assertEquals(EXPECTED_COUNT, workerPool.getPoolSize());
+    } else {
+      Assert.fail("Server is in illegal state. ServerHandler does not exist");
+    }
+
+  }
+
+  @Test
+  public void testStop() throws Exception {
+
+  }
+
+  @Test
+  public void testOnRouteUpdated() throws Exception {
+
+  }
+
+  @Test
+  public void testOnRouteDeleted() throws Exception {
+
+  }
+
+  private int findSocketCount(ZMQ.Context context)
+      throws NoSuchFieldException, IllegalAccessException {
+    Field ctxField = context.getClass().getDeclaredField("ctx");
+    ctxField.setAccessible(true);
+    Ctx ctx = Ctx.class.cast(ctxField.get(context));
+
+    Field socketListField = ctx.getClass().getDeclaredField("sockets");
+    socketListField.setAccessible(true);
+    List<SocketBase> sockets = List.class.cast(socketListField.get(ctx));
+
+    return sockets.size();
+  }
+
+  private List<Thread> findThreadsWithName(String name) {
+    Thread[] threads = new Thread[Thread.activeCount()];
+    Thread.enumerate(threads);
+
+    List<Thread> foundThreads = new ArrayList();
+    for (Thread t : threads) {
+      if (t.getName().startsWith(name))
+        foundThreads.add(t);
+    }
+
+    return foundThreads;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandlerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandlerTest.java
new file mode 100644 (file)
index 0000000..6e39867
--- /dev/null
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import junit.framework.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.zeromq.ZMQ;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.mockito.Mockito.mock;
+
+public class ServerRequestHandlerTest {
+
+  ServerRequestHandler handler;
+  ZMQ.Context context;
+  ExecutorService executorService = Executors.newCachedThreadPool();
+  private int workerCount = 2;
+  private String mockDealerAddress = "inproc://rpc-request-handler";
+  private String mockServerIp = "localhost";
+  private int mockServerPort = 5554;
+
+  @Before
+  public void setUp() throws Exception {
+    context = ZMQ.context(1);
+    String mockServerAddress = mockServerIp + ":" + mockServerPort;
+    Broker.ProviderSession mockSession = mock(Broker.ProviderSession.class);
+    handler = new ServerRequestHandler(context, mockSession, workerCount, mockDealerAddress, mockServerAddress);
+    handler.start();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    executorService.shutdown();
+    MessagingUtil.closeZmqContext(context);
+    handler.close();
+  }
+
+  @Test
+  public void testStart() throws Exception {
+    //should start workers == workerCount
+    Assert.assertEquals(workerCount, handler.getWorkerPool().getPoolSize());
+
+    //killing a thread should recreate another one
+
+    //start router-dealer bridge
+    executorService.execute(MessagingUtil.createRouterDealerBridge(context, mockDealerAddress, mockServerPort));
+    Thread.sleep(1000); //give sometime for socket initialization
+
+    //this will kill the thread
+    final String WORKER_THREAD_NAME = "remote-rpc-worker";
+    interruptAThreadWithName(WORKER_THREAD_NAME);
+
+    //send 4 message to router
+    for (int i = 0; i < 4; i++)
+      executorService.execute(MessagingUtil.sendAnEmptyMessage(context, "tcp://" + mockServerIp + ":" + mockServerPort));
+
+    //worker pool size should not change.
+    Assert.assertEquals(workerCount, handler.getWorkerPool().getPoolSize());
+
+    Thread.sleep(10000); //wait for processing to complete
+  }
+
+  @Test
+  public void testClose() throws Exception {
+
+  }
+
+  /**
+   * Interrupts the first thread found whose name starts with the provided name
+   *
+   * @param name
+   */
+  private void interruptAThreadWithName(String name) {
+    List<Thread> workerThreads = findThreadsWithName(name);
+    if (workerThreads.size() > 0) workerThreads.get(0).interrupt();
+  }
+
+  /**
+   * Find all threads that start with the given name
+   *
+   * @param name
+   * @return
+   */
+  private List<Thread> findThreadsWithName(String name) {
+    Thread[] threads = new Thread[Thread.activeCount()];
+    Thread.enumerate(threads);
+
+    List<Thread> foundThreads = new ArrayList();
+    for (Thread t : threads) {
+      if (t.getName().startsWith(name))
+        foundThreads.add(t);
+    }
+
+    return foundThreads;
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManagerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManagerTest.java
deleted file mode 100644 (file)
index 130b30d..0000000
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import com.google.common.base.Optional;
-import junit.framework.Assert;
-import org.junit.After;
-import org.junit.Before;
-import org.zeromq.ZMQ;
-import org.opendaylight.controller.sal.connector.remoterpc.SocketManager;
-import org.opendaylight.controller.sal.connector.remoterpc.RpcSocket;
-import org.opendaylight.controller.sal.connector.remoterpc.Context;
-import org.junit.Test;
-
-public class SocketManagerTest {
-
-  SocketManager manager;
-
-  @Before
-  public void setup(){
-    manager = new SocketManager();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    manager.close();
-  }
-
-  @Test
-  public void getManagedSockets_When2NewAdded_ShouldContain2() throws Exception {
-
-    //Prepare data
-    manager.getManagedSocket("tcp://localhost:5554");
-    manager.getManagedSocket("tcp://localhost:5555");
-
-    Assert.assertTrue( 2 == manager.getManagedSockets().size());
-  }
-
-  @Test
-  public void getManagedSockets_When2NewAddedAnd1Existing_ShouldContain2() throws Exception {
-
-    //Prepare data
-    manager.getManagedSocket("tcp://localhost:5554");
-    manager.getManagedSocket("tcp://localhost:5555");
-    manager.getManagedSocket("tcp://localhost:5554"); //ask for the first one
-
-    Assert.assertTrue( 2 == manager.getManagedSockets().size());
-  }
-
-  @Test
-  public void getManagedSocket_WhenPassedAValidAddress_ShouldReturnARpcSocket() throws Exception {
-    String testAddress = "tcp://localhost:5554";
-    RpcSocket rpcSocket = manager.getManagedSocket(testAddress);
-    Assert.assertEquals(testAddress, rpcSocket.getAddress());
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void getManagedSocket_WhenPassedInvalidHostAddress_ShouldThrow() throws Exception {
-    String testAddress = "tcp://nonexistenthost:5554";
-    RpcSocket rpcSocket = manager.getManagedSocket(testAddress);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void getManagedSocket_WhenPassedInvalidAddress_ShouldThrow() throws Exception {
-    String testAddress = "xxx";
-    RpcSocket rpcSocket = manager.getManagedSocket(testAddress);
-  }
-
-  @Test
-  public void getManagedSocket_WhenPassedAValidZmqSocket_ShouldReturnARpcSocket() throws Exception {
-    //Prepare data
-    String firstAddress = "tcp://localhost:5554";
-    RpcSocket firstRpcSocket = manager.getManagedSocket(firstAddress);
-    ZMQ.Socket firstZmqSocket = firstRpcSocket.getSocket();
-
-    String secondAddress = "tcp://localhost:5555";
-    RpcSocket secondRpcSocket = manager.getManagedSocket(secondAddress);
-    ZMQ.Socket secondZmqSocket = secondRpcSocket.getSocket();
-
-    Assert.assertEquals(firstRpcSocket, manager.getManagedSocketFor(firstZmqSocket).get());
-    Assert.assertEquals(secondRpcSocket, manager.getManagedSocketFor(secondZmqSocket).get());
-  }
-
-  @Test
-  public void getManagedSocket_WhenPassedNonManagedZmqSocket_ShouldReturnNone() throws Exception {
-    ZMQ.Socket nonManagedSocket = Context.getInstance().getZmqContext().socket(ZMQ.REQ);
-    nonManagedSocket.connect("tcp://localhost:5000");
-
-    //Prepare data
-    String firstAddress = "tcp://localhost:5554";
-    RpcSocket firstRpcSocket = manager.getManagedSocket(firstAddress);
-    ZMQ.Socket firstZmqSocket = firstRpcSocket.getSocket();
-
-    Assert.assertSame(Optional.<RpcSocket>absent(), manager.getManagedSocketFor(nonManagedSocket) );
-    Assert.assertSame(Optional.<RpcSocket>absent(), manager.getManagedSocketFor(null) );
-  }
-
-  @Test
-  public void stop_WhenCalled_ShouldEmptyManagedSockets() throws Exception {
-    manager.getManagedSocket("tcp://localhost:5554");
-    manager.getManagedSocket("tcp://localhost:5555");
-    Assert.assertTrue( 2 == manager.getManagedSockets().size());
-
-    manager.close();
-    Assert.assertTrue( 0 == manager.getManagedSockets().size());
-  }
-
-  @Test
-  public void poller_WhenCalled_ShouldReturnAnInstanceOfPoller() throws Exception {
-    Assert.assertTrue (manager.getPoller() instanceof ZMQ.Poller);
-  }
-
-}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/MessagingUtil.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/MessagingUtil.java
new file mode 100644 (file)
index 0000000..20cf4f6
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc.utils;
+
+import junit.framework.Assert;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+public class MessagingUtil {
+
+  private static final Logger _logger = LoggerFactory.getLogger(MessagingUtil.class);
+
+  public static Runnable startReplyServer(final ZMQ.Context context,
+                                          final String serverAddress,
+                                          final int numRequests /*number of requests after which server shuts down*/) {
+    return new Runnable() {
+
+      @Override
+      public void run() {
+        final ZMQ.Socket socket = context.socket(ZMQ.REP);
+        try {
+          int returnCode = socket.bind("tcp://" + serverAddress);
+          Assert.assertNotSame(-1, returnCode);
+          _logger.info(" Starting reply server[{}] for test...", serverAddress);
+
+          //for (int i=0;i<numRequests;i++) {
+          while (!Thread.currentThread().isInterrupted()) {
+            byte[] bytes = socket.recv();
+            _logger.debug(" Got request ");
+            socket.send(bytes);
+            _logger.debug(" Sent response ");
+          }
+        } catch (Exception x) {
+          StringWriter w = new StringWriter();
+          PrintWriter p = new PrintWriter(w);
+          x.printStackTrace(p);
+          _logger.debug(w.toString());
+        } finally {
+          socket.close();
+          _logger.info("Shutting down reply server");
+        }
+      }
+    };
+  }
+
+  public static Runnable createRouterDealerBridge(final ZMQ.Context context, final String dealerAddress, final int routerPort) {
+    return new Runnable() {
+      @Override
+      public void run() {
+        ZMQ.Socket router = null;
+        ZMQ.Socket dealer = null;
+        try {
+          router = context.socket(ZMQ.ROUTER);
+          dealer = context.socket(ZMQ.DEALER);
+          router.bind("tcp://*:" + routerPort);
+          dealer.bind(dealerAddress);
+          ZMQ.proxy(router, dealer, null);
+        } catch (Exception e) {/*Ignore*/} finally {
+          if (router != null) router.close();
+          if (dealer != null) dealer.close();
+        }
+      }
+    };
+  }
+
+  public static Runnable sendAnEmptyMessage(final ZMQ.Context context, final String serverAddress)
+          throws IOException, ClassNotFoundException, InterruptedException {
+
+    return new Runnable() {
+      @Override
+      public void run() {
+        final ZMQ.Socket socket = context.socket(ZMQ.REQ);
+        try {
+
+          socket.connect(serverAddress);
+          System.out.println(Thread.currentThread().getName() + " Sending message");
+          try {
+            socket.send(Message.serialize(new Message()));
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+          byte[] bytes = socket.recv();
+          Message response = null;
+          try {
+            response = (Message) Message.deserialize(bytes);
+          } catch (IOException e) {
+            e.printStackTrace();
+          } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+          }
+          System.out.println(Thread.currentThread().getName() + " Got response " + response);
+        } catch (Exception x) {
+          x.printStackTrace();
+        } finally {
+          socket.close();
+        }
+      }
+    };
+  }
+
+  public static Message createEmptyMessage() {
+    return new Message();
+  }
+
+  /**
+   * Closes ZMQ Context. It tries to gracefully terminate the context. If
+   * termination takes more than a second, its forcefully shutdown.
+   */
+  public static void closeZmqContext(final ZMQ.Context context) {
+    if (context == null) return;
+
+    ExecutorService exec = Executors.newSingleThreadExecutor();
+    FutureTask zmqTermination = new FutureTask(new Runnable() {
+
+      @Override
+      public void run() {
+        try {
+          if (context != null)
+            context.term();
+            _logger.debug("ZMQ Context terminated gracefully!");
+        } catch (Exception e) {/*Ignore and continue shutdown*/}
+      }
+    }, null);
+
+    exec.execute(zmqTermination);
+
+    try {
+      zmqTermination.get(1L, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      _logger.debug("ZMQ Context terminated forcefully!");
+    }
+
+    exec.shutdownNow();
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/logback-test.xml b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/resources/logback-test.xml
new file mode 100644 (file)
index 0000000..7eb75b9
--- /dev/null
@@ -0,0 +1,13 @@
+<configuration scan="true">
+
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+      </pattern>
+    </encoder>
+  </appender>
+
+  <root level="DEBUG">
+    <appender-ref ref="STDOUT" />
+  </root>
+</configuration>
index 6c294dddcc0fcaab68a16ee984d38202cede3cec..2bc151a0f344712fec00848827362a52ba3bee29 100644 (file)
@@ -2,7 +2,6 @@ package org.opendaylight.controller.sample.zeromq.provider;
 
 import org.opendaylight.controller.sal.common.util.RpcErrors;
 import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.CompositeNodeImpl;
 import org.opendaylight.controller.sal.core.api.AbstractProvider;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
index dd910ea34bdb760721d1cc3151ff961ea07e75d8..5ee982009e53e5b1492af9811ddf9336a821acfc 100644 (file)
@@ -13,14 +13,14 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.opendaylight.controller.sal.connector.remoterpc.Client;
+
 import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcClient;
-import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcServer;
-import org.opendaylight.controller.sal.connector.remoterpc.ServerImpl;
+
 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
 import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sample.zeromq.provider.ExampleProvider;
 import org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer;
+import org.opendaylight.controller.sample.zeromq.provider.ExampleProvider;
+
 import org.opendaylight.controller.test.sal.binding.it.TestHelper;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
@@ -33,7 +33,7 @@ import org.ops4j.pax.exam.util.Filter;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleException;
 import org.osgi.framework.ServiceReference;
-import org.osgi.framework.ServiceRegistration;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.zeromq.ZMQ;
@@ -46,8 +46,7 @@ import java.util.Hashtable;
 
 import static org.opendaylight.controller.test.sal.binding.it.TestHelper.baseModelBundles;
 import static org.opendaylight.controller.test.sal.binding.it.TestHelper.bindingAwareSalBundles;
-import static org.opendaylight.controller.test.sal.binding.it.TestHelper.configMinumumBundles;
-import static org.opendaylight.controller.test.sal.binding.it.TestHelper.mdSalCoreBundles;
+
 import static org.ops4j.pax.exam.CoreOptions.*;
 
 @RunWith(PaxExam.class)
@@ -91,8 +90,8 @@ public class RouterTest {
 
     _logger.debug("Provider sends announcement [{}]", "heartbeat");
     provider.announce(QNAME);
-    ServiceReference routerRef = ctx.getServiceReference(Client.class);
-    Client router = (Client) ctx.getService(routerRef);
+    ServiceReference routerRef = ctx.getServiceReference(RemoteRpcClient.class);
+    RemoteRpcClient router = (RemoteRpcClient) ctx.getService(routerRef);
     _logger.debug("Found router[{}]", router);
     _logger.debug("Invoking RPC [{}]", QNAME);
     for (int i = 0; i < 3; i++) {
@@ -353,6 +352,7 @@ public class RouterTest {
       }
     }
   }
+
   private String stateToString(int state) {
     switch (state) {
       case Bundle.ACTIVE:
@@ -371,86 +371,84 @@ public class RouterTest {
   @Configuration
   public Option[] config() {
     return options(systemProperty("osgi.console").value("2401"),
-        systemProperty("rpc.port").value("5555"),
-        mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
-        mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
-        mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
-        mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
-
-        //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
-        mavenBundle(ODL, "sal-common").versionAsInProject(), //
-        mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
-        mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
-        mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
-        mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
-        mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
-        mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
-        mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
-
-
-        
-        baseModelBundles(),
-        bindingAwareSalBundles(),
-        TestHelper.bindingIndependentSalBundles(),
-        TestHelper.configMinumumBundles(),
-        TestHelper.mdSalCoreBundles(),
-        
-      //Added the consumer
-        mavenBundle(SAMPLE, "sal-remoterpc-connector-test-consumer").versionAsInProject(), //
-      //**** These two bundles below are NOT successfully resolved -- some of their dependencies must be missing
-      //**** This causes the "Message" error to occur, the class cannot be found
-        mavenBundle(SAMPLE, "sal-remoterpc-connector-test-provider").versionAsInProject(), //
-        mavenBundle(ODL, "sal-remoterpc-connector").versionAsInProject(), //
-
-        mavenBundle(ODL, "zeromq-routingtable.implementation").versionAsInProject(),
-        mavenBundle(YANG, "concepts").versionAsInProject(),
-        mavenBundle(YANG, "yang-binding").versionAsInProject(), //
-        mavenBundle(YANG, "yang-common").versionAsInProject(), //
-        mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
-        mavenBundle(YANG, "yang-data-impl").versionAsInProject(), //
-        mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
-        mavenBundle(YANG, "yang-parser-api").versionAsInProject(), //
-        mavenBundle(YANG, "yang-parser-impl").versionAsInProject(), //
-        mavenBundle(YANG, "yang-model-util").versionAsInProject(), //
-        mavenBundle(YANG + ".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
-        mavenBundle(YANG + ".thirdparty", "antlr4-runtime-osgi-nohead").versionAsInProject(), //
-        mavenBundle("com.google.guava", "guava").versionAsInProject(), //
-        mavenBundle("org.zeromq", "jeromq").versionAsInProject(),
-        mavenBundle("com.fasterxml.jackson.core", "jackson-annotations").versionAsInProject(),
-        mavenBundle("com.fasterxml.jackson.core", "jackson-core").versionAsInProject(),
-        mavenBundle("com.fasterxml.jackson.core", "jackson-databind").versionAsInProject(),
-        //routingtable dependencies
-        systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),
-        // List framework bundles
-        mavenBundle("equinoxSDK381", "org.eclipse.equinox.console").versionAsInProject(),
-        mavenBundle("equinoxSDK381", "org.eclipse.equinox.util").versionAsInProject(),
-        mavenBundle("equinoxSDK381", "org.eclipse.osgi.services").versionAsInProject(),
-        mavenBundle("equinoxSDK381", "org.eclipse.equinox.ds").versionAsInProject(),
-        mavenBundle("equinoxSDK381", "org.apache.felix.gogo.command").versionAsInProject(),
-        mavenBundle("equinoxSDK381", "org.apache.felix.gogo.runtime").versionAsInProject(),
-        mavenBundle("equinoxSDK381", "org.apache.felix.gogo.shell").versionAsInProject(),
-        // List logger bundles
-
-        mavenBundle("org.opendaylight.controller", "clustering.services")
-            .versionAsInProject(),
-        mavenBundle("org.opendaylight.controller", "clustering.stub")
-            .versionAsInProject(),
-
-
-        // List all the bundles on which the test case depends
-        mavenBundle("org.opendaylight.controller", "sal")
-            .versionAsInProject(),
-        mavenBundle("org.opendaylight.controller", "sal.implementation")
-            .versionAsInProject(),
-        mavenBundle("org.jboss.spec.javax.transaction",
-            "jboss-transaction-api_1.1_spec").versionAsInProject(),
-        mavenBundle("org.apache.commons", "commons-lang3")
-            .versionAsInProject(),
-        mavenBundle("org.apache.felix",
-            "org.apache.felix.dependencymanager")
-            .versionAsInProject(),
-
-        junitBundles()
+            systemProperty("rpc.port").value("5555"),
+            mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
+            mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
+            mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
+            mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
+
+            //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
+            mavenBundle(ODL, "sal-common").versionAsInProject(), //
+            mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
+            mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
+            mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
+            mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
+            mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
+            mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
+            mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
+
+
+            baseModelBundles(),
+            bindingAwareSalBundles(),
+            TestHelper.bindingIndependentSalBundles(),
+            TestHelper.configMinumumBundles(),
+            TestHelper.mdSalCoreBundles(),
+
+            //Added the consumer
+            mavenBundle(SAMPLE, "sal-remoterpc-connector-test-consumer").versionAsInProject(), //
+            //**** These two bundles below are NOT successfully resolved -- some of their dependencies must be missing
+            //**** This causes the "Message" error to occur, the class cannot be found
+            mavenBundle(SAMPLE, "sal-remoterpc-connector-test-provider").versionAsInProject(), //
+            mavenBundle(ODL, "sal-remoterpc-connector").versionAsInProject(), //
+
+            mavenBundle(ODL, "zeromq-routingtable.implementation").versionAsInProject(),
+            mavenBundle(YANG, "concepts").versionAsInProject(),
+            mavenBundle(YANG, "yang-binding").versionAsInProject(), //
+            mavenBundle(YANG, "yang-common").versionAsInProject(), //
+            mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
+            mavenBundle(YANG, "yang-data-impl").versionAsInProject(), //
+            mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
+            mavenBundle(YANG, "yang-parser-api").versionAsInProject(), //
+            mavenBundle(YANG, "yang-parser-impl").versionAsInProject(), //
+            mavenBundle(YANG, "yang-model-util").versionAsInProject(), //
+            mavenBundle(YANG + ".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
+            mavenBundle(YANG + ".thirdparty", "antlr4-runtime-osgi-nohead").versionAsInProject(), //
+            mavenBundle("com.google.guava", "guava").versionAsInProject(), //
+            mavenBundle("org.zeromq", "jeromq").versionAsInProject(),
+            mavenBundle("org.codehaus.jackson", "jackson-mapper-asl").versionAsInProject(),
+            mavenBundle("org.codehaus.jackson", "jackson-core-asl").versionAsInProject(),
+            //routingtable dependencies
+            systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),
+            // List framework bundles
+            mavenBundle("equinoxSDK381", "org.eclipse.equinox.console").versionAsInProject(),
+            mavenBundle("equinoxSDK381", "org.eclipse.equinox.util").versionAsInProject(),
+            mavenBundle("equinoxSDK381", "org.eclipse.osgi.services").versionAsInProject(),
+            mavenBundle("equinoxSDK381", "org.eclipse.equinox.ds").versionAsInProject(),
+            mavenBundle("equinoxSDK381", "org.apache.felix.gogo.command").versionAsInProject(),
+            mavenBundle("equinoxSDK381", "org.apache.felix.gogo.runtime").versionAsInProject(),
+            mavenBundle("equinoxSDK381", "org.apache.felix.gogo.shell").versionAsInProject(),
+            // List logger bundles
+
+            mavenBundle("org.opendaylight.controller", "clustering.services")
+                    .versionAsInProject(),
+            mavenBundle("org.opendaylight.controller", "clustering.stub")
+                    .versionAsInProject(),
+
+
+            // List all the bundles on which the test case depends
+            mavenBundle("org.opendaylight.controller", "sal")
+                    .versionAsInProject(),
+            mavenBundle("org.opendaylight.controller", "sal.implementation")
+                    .versionAsInProject(),
+            mavenBundle("org.jboss.spec.javax.transaction",
+                    "jboss-transaction-api_1.1_spec").versionAsInProject(),
+            mavenBundle("org.apache.commons", "commons-lang3")
+                    .versionAsInProject(),
+            mavenBundle("org.apache.felix",
+                    "org.apache.felix.dependencymanager")
+                    .versionAsInProject(),
+
+            junitBundles()
     );
   }
 
index 6c9ec4e788827a6c375d4da4964578622a9ea47b..b4fa698a92235c5d8fd9bea103c4497bd702cb8f 100644 (file)
@@ -4,7 +4,6 @@ import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.CompositeNodeImpl;
 import org.opendaylight.controller.sal.connector.remoterpc.impl.RoutingTableImpl;
 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
 import org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer;
@@ -70,7 +69,7 @@ public class Router {
     _logger.info("Invoking RPC");
 
     ExampleConsumer consumer = getConsumer();
-    RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, new CompositeNodeImpl());
+    RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, consumer.getValidCompositeNodeWithOneSimpleChild());
     _logger.info("Result [{}]", result.isSuccessful());
 
     return stringify(result);
@@ -111,7 +110,7 @@ public class Router {
       _logger.debug("Could not get routing table impl reference");
       return "Could not get routingtable referen ";
     }
-    RoutingTable routingTable = (RoutingTableImpl) ctx.getService(routingTableServiceReference);
+    RoutingTableImpl routingTable = (RoutingTableImpl) ctx.getService(routingTableServiceReference);
     if (routingTable == null) {
       _logger.info("Could not get routing table service");
       return "Could not get routing table service";
@@ -128,16 +127,14 @@ public class Router {
       _logger.error("error in adding routing identifier" + e.getMessage());
     }
 
-    Set<String> routes = routingTable.getRoutes(rii.toString());
+    String result = routingTable.dumpRoutingTableCache();
 
-    StringBuilder stringBuilder = new StringBuilder();
-    for (String route : routes) {
-      stringBuilder.append(route);
-    }
 
-    _logger.info("Result [{}] routes added for route" + rii + stringBuilder.toString());
 
-    return stringBuilder.toString();
+
+    _logger.info("Result [{}] routes added for route" + rii + result);
+
+    return result;
   }
 
   @GET
@@ -242,5 +239,27 @@ public class Router {
     public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier getRoute() {
       return InstanceIdentifier.of(instance);
     }
+
+      @Override
+      public boolean equals(Object o) {
+          if (this == o) return true;
+          if (o == null || getClass() != o.getClass()) return false;
+
+          RoutingIdentifierImpl that = (RoutingIdentifierImpl) o;
+
+          if (!QNAME.equals(that.QNAME)) return false;
+          if (!instance.equals(that.instance)) return false;
+          if (!namespace.equals(that.namespace)) return false;
+
+          return true;
+      }
+
+      @Override
+      public int hashCode() {
+          int result = namespace.hashCode();
+          result = 31 * result + QNAME.hashCode();
+          result = 31 * result + instance.hashCode();
+          return result;
+      }
   }
 }