Enhancements to remote rpc client. Using zmq router-dealer bridge to make the client async.
Added configuration for remote rpc in Configuration subsystem
On Server startup reading of routingtable to populate remoterpcs was giving exception - fixed the same
Client was not registered with Server and addRPCImplementation was failing - fixed the same
ServerImpl was not getting registered as listener to RoutingTable hence announcement was not received on remote - fixed the same
Patch 6: Some unit tests were intemittently hanging. ZMQ Test server was not gracefully shutting down.
Change-Id: I6054443b39394f82258522205ccd4be470f597f0
Signed-off-by: Basheeruddin Ahmed <syedbahm@cisco.com>
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
<name>runtime-mapping-singleton</name>
</mapping-service>
</module>
+ <module>
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">prefix:remote-zeromq-rpc-server</type>
+ <name>remoter</name>
+ <port xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc">5666</port> 59
+ <dom-broker xmlns="urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc"> 60
+ <type xmlns:prefix="urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom">prefix:dom-broker-osgi-registry</type>
+ <name>dom-broker</name>
+ </dom-broker>
+ </module>
</modules>
<services xmlns="urn:opendaylight:params:xml:ns:yang:controller:config">
<capability>urn:opendaylight:yang:extension:yang-ext?module=yang-ext&revision=2013-07-09</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:common?module=opendaylight-md-sal-common&revision=2013-10-28</capability>
<capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:cluster:store?module=odl-sal-dom-clustered-store-cfg&revision=2013-10-28</capability>
+ <capability>urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc?module=odl-sal-dom-rpc-remote-cfg&revision=2013-10-28</capability>
</required-capabilities>
</snapshot>
*/
package org.opendaylight.controller.sal.connector.remoterpc.api;
+import java.util.Map;
import java.util.Set;
public interface RoutingTable<I,R> {
*/
public Set<R> getRoutes(I routeId);
+ /**
+ * Returns all network addresses stored in the table
+ * @return
+ */
+ public Set<Map.Entry> getAllRoutes();
+
/**
* Returns only one address from the list of network addresses
* associated with the route. The algorithm to determine that
}
// lets start a transaction
clusterGlobalServices.tbegin();
- Set<R> routes = new HashSet<R>();
- routes.add(route);
- routingTableCache.put(routeId, routes);
+
+ routingTableCache.put(routeId, route);
clusterGlobalServices.tcommit();
} else {
throw new DuplicateRouteException(" There is already existing route " + existingRoute);
}
- } catch (NotSupportedException e) {
- throw new RoutingTableException("Transaction error - while trying to create route id="
- + routeId + "with route" + route, e);
- } catch (HeuristicRollbackException e) {
- throw new RoutingTableException("Transaction error - while trying to create route id="
- + routeId + "with route" + route, e);
- } catch (RollbackException e) {
- throw new RoutingTableException("Transaction error - while trying to create route id="
- + routeId + "with route" + route, e);
- } catch (HeuristicMixedException e) {
+ } catch (NotSupportedException|HeuristicRollbackException|RollbackException|HeuristicMixedException e) {
throw new RoutingTableException("Transaction error - while trying to create route id="
+ routeId + "with route" + route, e);
} catch (javax.transaction.SystemException e) {
routingTableCache.remove(routeId);
clusterGlobalServices.tcommit();
- } catch (NotSupportedException e) {
- throw new RoutingTableException("Transaction error - while trying to remove route id="
- + routeId, e);
- } catch (HeuristicRollbackException e) {
- throw new RoutingTableException("Transaction error - while trying to remove route id="
- + routeId, e);
- } catch (RollbackException e) {
- throw new RoutingTableException("Transaction error - while trying to remove route id="
- + routeId, e);
- } catch (HeuristicMixedException e) {
+ } catch (NotSupportedException|HeuristicRollbackException|RollbackException|HeuristicMixedException e) {
throw new RoutingTableException("Transaction error - while trying to remove route id="
+ routeId, e);
} catch (javax.transaction.SystemException e) {
// Note: currently works for global routes only wherein there is just single
// route
Preconditions.checkNotNull(routeId, "getARoute: routeId cannot be null!");
- return (Set<R>) routingTableCache.get(routeId);
+ R route = (R)routingTableCache.get(routeId);
+ Set<R>routes = null;
+ if(route !=null){
+ routes = new HashSet<R>();
+ routes.add(route);
+ }
+
+ return routes;
}
- @Override
+ @Override
+ public Set<Map.Entry> getAllRoutes() {
+ return routingTableCache.entrySet();
+ }
+
+ @Override
public R getARoute(I routeId) {
throw new UnsupportedOperationException("Not implemented yet!");
}
import java.net.URI;
import java.util.EnumSet;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
rti.addGlobalRoute(routeIdentifier, "172.27.12.1:5000");
- Set<String> globalService = new HashSet<String>();
- globalService.add("172.27.12.1:5000");
+ String globalService = "172.27.12.1:5000";
when(concurrentMap.get(routeIdentifier)).thenReturn(globalService);
ConcurrentMap latestCache = rti.getRoutingTableCache();
Assert.assertEquals(servicesGlobal.size(),1);
-
- Assert.assertEquals(servicesGlobal.iterator().next(),"172.27.12.1:5000");
-
+ Iterator<String> iterator = servicesGlobal.iterator();
+ while(iterator.hasNext()){
+ Assert.assertEquals(iterator.next(),"172.27.12.1:5000");
+ }
}
rti.addGlobalRoute(routeIdentifier, "172.27.12.1:5000");
- Set<String> globalService = new HashSet<String>();
- globalService.add("172.27.12.1:5000");
+ String globalService = "172.27.12.1:5000";
when(concurrentMap.get(routeIdentifier)).thenReturn(globalService);
ConcurrentMap latestCache = rti.getRoutingTableCache();
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.core.api;
+
+import org.opendaylight.yangtools.yang.common.QName;
+
+public interface RpcRoutingContext {
+
+ public QName getContext();
+
+ public QName getRpcType();
+}
<groupId> org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
-
+ <dependency>
+ <groupId> ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.12</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</Import-Package>
<Export-Package>
org.opendaylight.controller.config.yang.md.sal.remote.rpc,
- org.opendaylight.controller.sal.connector.remoterpc,
- org.opendaylight.controller.sal.connector.remoterpc.*,
+ org.opendaylight.controller.sal.connector.remoterpc.util,
+ org.opendaylight.controller.sal.connector.remoterpc.dto,
+ org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcClient,
+ org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcServer,
+ org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcProvider
</Export-Package>
<Bundle-Name>${project.groupId}.${project.artifactId}</Bundle-Name>
</instructions>
*/
package org.opendaylight.controller.config.yang.md.sal.remote.rpc;
-import org.opendaylight.controller.sal.connector.remoterpc.Client;
-import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcProvider;
-import org.opendaylight.controller.sal.connector.remoterpc.RoutingTableProvider;
-import org.opendaylight.controller.sal.connector.remoterpc.ServerImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.*;
import org.opendaylight.controller.sal.core.api.Broker;
import org.osgi.framework.BundleContext;
public java.lang.AutoCloseable createInstance() {
Broker broker = getDomBrokerDependency();
- RoutingTableProvider provider = new RoutingTableProvider(bundleContext);
-
+
+
final int port = getPort() != null ? getPort() : ZEROMQ_ROUTER_PORT;
ServerImpl serverImpl = new ServerImpl(port);
- Client clientImpl = new Client();
+ ClientImpl clientImpl = new ClientImpl();
+
+ RoutingTableProvider provider = new RoutingTableProvider(bundleContext,serverImpl);
+
+
RemoteRpcProvider facade = new RemoteRpcProvider(serverImpl, clientImpl);
facade.setRoutingTableProvider(provider );
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+public class CapturedMessageHandler implements Runnable {
+
+ private Logger _logger = LoggerFactory.getLogger(CapturedMessageHandler.class);
+
+ private ZMQ.Socket socket;
+
+ public CapturedMessageHandler(ZMQ.Socket socket){
+ this.socket = socket;
+ }
+
+ @Override
+ public void run(){
+
+ try {
+ while (!Thread.currentThread().isInterrupted()){
+ String message = socket.recvStr();
+ _logger.debug("Captured [{}]", message);
+ }
+ } catch (Exception e) {
+ _logger.error("Exception raised [{}]", e.getMessage());
+ }
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import com.google.common.base.Optional;
-
-import org.opendaylight.controller.sal.common.util.RpcErrors;
-import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
-import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.*;
-
-import static com.google.common.base.Preconditions.*;
-
-/**
- * An implementation of {@link RpcImplementation} that makes remote RPC calls
- */
-public class Client implements RemoteRpcClient {
-
- private final Logger _logger = LoggerFactory.getLogger(Client.class);
-
- private final LinkedBlockingQueue<MessageWrapper> requestQueue = new LinkedBlockingQueue<MessageWrapper>(100);
-
- private final ExecutorService pool = Executors.newSingleThreadExecutor();
- private final long TIMEOUT = 5000; // in ms
-
- private RoutingTableProvider routingTableProvider;
-
- public RoutingTableProvider getRoutingTableProvider() {
- return routingTableProvider;
- }
-
- public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
- this.routingTableProvider = routingTableProvider;
- }
-
- public LinkedBlockingQueue<MessageWrapper> getRequestQueue() {
- return requestQueue;
- }
-
- @Override
- public Set<QName> getSupportedRpcs() {
- // TODO: Find the entries from routing table
- return Collections.emptySet();
- }
-
- public void start() {
- pool.execute(new Sender(this));
-
- }
-
- public void stop() {
-
- _logger.debug("Client stopping...");
- Context.getInstance().getZmqContext().term();
- _logger.debug("ZMQ context terminated");
-
- pool.shutdown(); // intiate shutdown
- try {
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow();
- if (!pool.awaitTermination(10, TimeUnit.SECONDS))
- _logger.error("Client thread pool did not shut down");
- }
- } catch (InterruptedException e) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- _logger.debug("Client stopped");
- }
-
- @Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
-
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setType(rpc);
-
- String address = lookupRemoteAddress(routeId);
-
- Message request = new Message.MessageBuilder().type(Message.MessageType.REQUEST)
- .sender(Context.getInstance().getLocalUri()).recipient(address).route(routeId)
- .payload(XmlUtils.compositeNodeToXml(input)).build();
-
- List<RpcError> errors = new ArrayList<RpcError>();
-
- try (SocketPair pair = new SocketPair()) {
-
- MessageWrapper messageWrapper = new MessageWrapper(request, pair.getSender());
- process(messageWrapper);
- Message response = parseMessage(pair.getReceiver());
-
- CompositeNode payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
-
- return Rpcs.getRpcResult(true, payload, errors);
-
- } catch (Exception e) {
- collectErrors(e, errors);
- return Rpcs.getRpcResult(false, null, errors);
- }
-
- }
-
- public void process(MessageWrapper msg) throws TimeoutException, InterruptedException {
- _logger.debug("Processing message [{}]", msg);
-
- boolean success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS);
- if (!success)
- throw new TimeoutException("Queue is full");
- }
-
- /**
- * Block on socket for reply
- *
- * @param receiver
- * @return
- */
- private Message parseMessage(ZMQ.Socket receiver) throws IOException, ClassNotFoundException {
- return (Message) Message.deserialize(receiver.recv());
- }
-
- /**
- * Find address for the given route identifier in routing table
- *
- * @param routeId
- * route identifier
- * @return remote network address
- */
- private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId) {
- checkNotNull(routeId, "route must not be null");
-
- Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
- checkNotNull(routingTable.isPresent(), "Routing table is null");
-
- Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
- checkNotNull(addresses, "Address not found for route [%s]", routeId);
- checkState(addresses.size() == 1, "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); // its
- // a
- // global
- // service.
-
- String address = addresses.iterator().next();
- checkNotNull(address, "Address not found for route [%s]", routeId);
-
- return address;
- }
-
- private void collectErrors(Exception e, List<RpcError> errors) {
- if (e == null)
- return;
- if (errors == null)
- errors = new ArrayList<RpcError>();
-
- errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
- for (Throwable t : e.getSuppressed()) {
- errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
- }
- }
-
- @Override
- public void close() throws Exception {
- stop();
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import org.opendaylight.controller.sal.common.util.RpcErrors;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * An implementation of {@link RpcImplementation} that makes
+ * remote RPC calls
+ */
+public class ClientImpl implements RemoteRpcClient {
+
+ private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
+
+ private ZMQ.Context context = ZMQ.context(1);
+ private ClientRequestHandler handler;
+ private RoutingTableProvider routingTableProvider;
+
+ public ClientImpl(){
+ handler = new ClientRequestHandler(context);
+ start();
+ }
+
+ public ClientImpl(ClientRequestHandler handler){
+ this.handler = handler;
+ start();
+ }
+
+ public RoutingTableProvider getRoutingTableProvider() {
+ return routingTableProvider;
+ }
+
+ public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
+ this.routingTableProvider = routingTableProvider;
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs(){
+ //TODO: Find the entries from routing table
+ return Collections.emptySet();
+ }
+
+ @Override
+ public void start() {/*NOOPS*/}
+
+ @Override
+ public void stop() {
+ closeZmqContext();
+ handler.close();
+ _logger.info("Stopped");
+ }
+
+ @Override
+ public void close(){
+ stop();
+ }
+
+ /**
+ * Finds remote server that can execute this rpc and sends a message to it
+ * requesting execution.
+ * The call blocks until a response from remote server is received. Its upto
+ * the client of this API to implement a timeout functionality.
+ *
+ * @param rpc remote service to be executed
+ * @param input payload for the remote service
+ * @return
+ */
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(rpc);
+
+ String address = lookupRemoteAddress(routeId);
+
+ Message request = new Message.MessageBuilder()
+ .type(Message.MessageType.REQUEST)
+ .sender(Context.getInstance().getLocalUri())
+ .recipient(address)
+ .route(routeId)
+ .payload(XmlUtils.compositeNodeToXml(input))
+ .build();
+
+ List<RpcError> errors = new ArrayList<RpcError>();
+
+ try{
+ Message response = handler.handle(request);
+ CompositeNode payload = null;
+
+ if ( response != null )
+ payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
+
+ return Rpcs.getRpcResult(true, payload, errors);
+
+ } catch (Exception e){
+ collectErrors(e, errors);
+ return Rpcs.getRpcResult(false, null, errors);
+ }
+
+ }
+
+ /**
+ * Find address for the given route identifier in routing table
+ * @param routeId route identifier
+ * @return remote network address
+ */
+ private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId){
+ checkNotNull(routeId, "route must not be null");
+
+ Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
+ checkNotNull(routingTable.isPresent(), "Routing table is null");
+
+ Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
+ checkNotNull(addresses, "Address not found for route [%s]", routeId);
+ checkState(addresses.size() == 1,
+ "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); //its a global service.
+
+ String address = addresses.iterator().next();
+ checkNotNull(address, "Address not found for route [%s]", routeId);
+
+ return address;
+ }
+
+ private void collectErrors(Exception e, List<RpcError> errors){
+ if (e == null) return;
+ if (errors == null) errors = new ArrayList<RpcError>();
+
+ errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause()));
+ for (Throwable t : e.getSuppressed()) {
+ errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t));
+ }
+ }
+
+ /**
+ * Closes ZMQ Context. It tries to gracefully terminate the context. If
+ * termination takes more than a second, its forcefully shutdown.
+ */
+ private void closeZmqContext() {
+ ExecutorService exec = Executors.newSingleThreadExecutor();
+ FutureTask zmqTermination = new FutureTask(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ if (context != null)
+ context.term();
+ _logger.debug("ZMQ Context terminated");
+ } catch (Exception e) {
+ _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
+ }
+ }
+ }, null);
+
+ exec.execute(zmqTermination);
+
+ try {
+ zmqTermination.get(1L, TimeUnit.SECONDS);
+ } catch (Exception e) {/*ignore and continue with shutdown*/}
+
+ exec.shutdownNow();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+class ClientRequestHandler implements AutoCloseable{
+
+ private Logger _logger = LoggerFactory.getLogger(ClientRequestHandler.class);
+ private final String DEFAULT_NAME = "remoterpc-client-worker";
+ private final String INPROC_PROTOCOL_PREFIX = "inproc://";
+ private final String TCP_PROTOCOL_PREFIX = "tcp://";
+
+ private ZMQ.Context context;
+
+ /*
+ * Worker thread pool. Each thread runs a ROUTER-DEALER pair
+ */
+ private ExecutorService workerPool;
+
+ /*
+ * Set of remote servers this client is currently connected to
+ */
+ private Map<String, String> connectedServers;
+
+ protected ClientRequestHandler(ZMQ.Context context) {
+ this.context = context;
+ connectedServers = new ConcurrentHashMap<String, String>();
+ start();
+ }
+
+ /**
+ * Starts a pool of worker as needed. A worker thread that has not been used for 5 min
+ * is terminated and removed from the pool. If thread dies due to an exception, its
+ * restarted.
+ */
+ private void start(){
+
+ workerPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 5L, TimeUnit.MINUTES,
+ new SynchronousQueue<Runnable>()){
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ if (isTerminating() || isTerminated() || isShutdown())
+ return;
+
+ Worker worker = (Worker) r;
+ Preconditions.checkState( worker != null );
+ String remoteServerAddress = worker.getRemoteServerAddress();
+ connectedServers.remove(remoteServerAddress);
+
+ if ( t != null ){
+ _logger.debug("Exception caught while terminating worker [{},{}]. " +
+ "Restarting worker...", t.getClass(), t.getMessage());
+
+ connectedServers.put(remoteServerAddress, remoteServerAddress);
+ this.execute(r);
+ }
+ super.afterExecute(r, null);
+ }
+ };
+ }
+
+ public Message handle(Message request) throws IOException, ClassNotFoundException, InterruptedException {
+
+ String remoteServerAddress = request.getRecipient();
+ //if we already have router-dealer bridge setup for this address the send request
+ //otherwise first create the bridge and then send request
+ if ( connectedServers.containsKey(remoteServerAddress) )
+ return sendMessage(request, remoteServerAddress);
+ else{
+ workerPool.execute(new Worker(remoteServerAddress));
+ connectedServers.put(remoteServerAddress, remoteServerAddress);
+ //give little time for sockets to get initialized
+ //TODO: Add socket ping-pong message to ensure socket init rather than thread.sleep.
+ Thread.sleep(1000);
+ return sendMessage(request, remoteServerAddress);
+ }
+ }
+
+ private Message sendMessage(Message request, String address) throws IOException, ClassNotFoundException {
+ Message response = null;
+ ZMQ.Socket socket = context.socket(ZMQ.REQ);
+
+ try {
+ socket.connect( INPROC_PROTOCOL_PREFIX + address);
+ socket.send(Message.serialize(request));
+ _logger.debug("Request sent. Waiting for reply...");
+ byte[] reply = socket.recv(0);
+ _logger.debug("Response received");
+ response = (Message) Message.deserialize(reply);
+ } finally {
+ socket.close();
+ }
+ return response;
+ }
+
+ /**
+ * This gets called automatically if used with try-with-resources
+ */
+ @Override
+ public void close(){
+ workerPool.shutdown();
+ _logger.info("Request Handler closed");
+ }
+
+ /**
+ * Total number of workers in the pool. Number of workers represent
+ * number of remote servers {@link org.opendaylight.controller.sal.connector.remoterpc.ClientImpl} is connected to.
+ *
+ * @return worker count
+ */
+ public int getWorkerCount(){
+
+ if (workerPool == null) return 0;
+
+ return ((ThreadPoolExecutor)workerPool).getActiveCount();
+ }
+ /**
+ * Handles RPC request
+ */
+ private class Worker implements Runnable {
+ private String name;
+ private String remoteServer; //<servername:rpc-port>
+
+ public Worker(String address){
+ this.name = DEFAULT_NAME + "[" + address + "]";
+ this.remoteServer = address;
+ }
+
+ public String getRemoteServerAddress(){
+ return this.remoteServer;
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName(name);
+ _logger.debug("Starting ... ");
+
+ ZMQ.Socket router = context.socket(ZMQ.ROUTER);
+ ZMQ.Socket dealer = context.socket(ZMQ.DEALER);
+
+ try {
+ int success = router.bind(INPROC_PROTOCOL_PREFIX + remoteServer);
+ Preconditions.checkState(-1 != success, "Could not bind to " + remoteServer);
+
+ dealer.connect(TCP_PROTOCOL_PREFIX + remoteServer);
+
+ _logger.info("Worker started for [{}]", remoteServer);
+
+ //TODO: Add capture handler
+ //This code will block until the zmq context is terminated.
+ ZMQ.proxy(router, dealer, null);
+
+ } catch (Exception e) {
+ _logger.debug("Ignoring exception [{}, {}]", e.getClass(), e.getMessage());
+ } finally {
+ try {
+ router.close();
+ dealer.close();
+ } catch (Exception x) {
+ _logger.debug("Exception while closing socket [{}]", x);
+ }
+ _logger.debug("Closing...");
+ }
+ }
+ }
+}
public class Context {
private ZMQ.Context zmqContext = ZMQ.context(1);
private String uri;
+ private final String DEFAULT_RPC_PORT = "5554";
private static Context _instance = new Context();
public String getLocalUri(){
uri = (uri != null) ? uri
- : new StringBuilder("tcp://").append(getIpAddress()).append(":")
+ : new StringBuilder().append(getIpAddress()).append(":")
.append(getRpcPort()).toString();
return uri;
public String getRpcPort(){
String rpcPort = (System.getProperty("rpc.port") != null)
? System.getProperty("rpc.port")
- : "5554";
+ : DEFAULT_RPC_PORT;
return rpcPort;
}
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.sal.connector.remoterpc;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
-package org.opendaylight.controller.sal.connector.remoterpc;
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
+package org.opendaylight.controller.sal.connector.remoterpc;
-import org.opendaylight.controller.sal.connector.remoterpc.Client;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import java.util.Collection;
+import java.util.Set;
+
public class RemoteRpcProvider implements
RemoteRpcServer,
RemoteRpcClient,
Provider {
private final ServerImpl server;
- private final Client client;
+ private final ClientImpl client;
private RoutingTableProvider provider;
@Override
}
- public RemoteRpcProvider(ServerImpl server, Client client) {
+ public RemoteRpcProvider(ServerImpl server, ClientImpl client) {
this.server = server;
this.client = client;
}
public void setBrokerSession(ProviderSession session) {
server.setBrokerSession(session);
}
- public void setServerPool(ExecutorService serverPool) {
- server.setServerPool(serverPool);
- }
+// public void setServerPool(ExecutorService serverPool) {
+// server.setServerPool(serverPool);
+// }
public void start() {
- client.setRoutingTableProvider(provider);
- server.setRoutingTableProvider(provider);
+ //when listener was being invoked and addRPCImplementation was being
+ //called the client was null.
+ server.setClient(client);
server.start();
client.start();
+
+
}
- public void onRouteUpdated(String key, Set values) {
- server.onRouteUpdated(key, values);
- }
- public void onRouteDeleted(String key) {
- server.onRouteDeleted(key);
- }
-
+
@Override
public Collection<ProviderFunctionality> getProviderFunctionality() {
client.close();
}
-
-
-
@Override
public void stop() {
server.stop();
-package org.opendaylight.controller.sal.connector.remoterpc;
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.connector.remoterpc;
public interface RemoteRpcServer extends AutoCloseable {
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.sal.connector.remoterpc;
+import com.google.common.base.Optional;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.impl.RoutingTableImpl;
import org.osgi.framework.BundleContext;
import org.osgi.util.tracker.ServiceTracker;
-import com.google.common.base.Optional;
-
public class RoutingTableProvider implements AutoCloseable {
@SuppressWarnings("rawtypes")
final ServiceTracker<RoutingTable,RoutingTable> tracker;
+
+ private RoutingTableImpl routingTableImpl = null;
+
+ final private RouteChangeListener routeChangeListener;
- public RoutingTableProvider(BundleContext ctx) {
+ public RoutingTableProvider(BundleContext ctx,RouteChangeListener rcl) {
@SuppressWarnings("rawtypes")
ServiceTracker<RoutingTable, RoutingTable> rawTracker = new ServiceTracker<>(ctx, RoutingTable.class, null);
tracker = rawTracker;
tracker.open();
+
+ routeChangeListener = rcl;
}
public Optional<RoutingTable<String, String>> getRoutingTable() {
@SuppressWarnings("unchecked")
RoutingTable<String,String> tracked = tracker.getService();
+
+ if(tracked instanceof RoutingTableImpl){
+ if(routingTableImpl != tracked){
+ routingTableImpl= (RoutingTableImpl)tracked;
+ routingTableImpl.setRouteChangeListener(routeChangeListener);
+ }
+ }
+
return Optional.fromNullable(tracked);
}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-
-import java.io.IOException;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * A class encapsulating {@link ZMQ.Socket} of type {@link ZMQ.REQ}.
- * It adds following capabilities:
- * <li> Retry logic - Tries 3 times before giving up
- * <li> Request times out after {@link TIMEOUT} property
- * <li> The limitation of {@link ZMQ.REQ}/{@link ZMQ.REP} pair is that no 2 requests can be sent before
- * the response for the 1st request is received. To overcome that, this socket queues all messages until
- * the previous request has been responded.
- */
-public class RpcSocket {
-
- // Constants
- public static final int TIMEOUT = 2000;
- public static final int QUEUE_SIZE = 10;
- public static final int NUM_RETRIES = 3;
- private static final Logger log = LoggerFactory.getLogger(RpcSocket.class);
-
- private ZMQ.Socket socket;
- private ZMQ.Poller poller;
- private String address;
- private SocketState state;
- private long sendTime;
- private int retriesLeft;
- private LinkedBlockingQueue<MessageWrapper> inQueue;
-
-
- public RpcSocket(String address, ZMQ.Poller poller) {
- this.socket = null;
- this.state = new IdleSocketState();
- this.sendTime = -1;
- this.retriesLeft = NUM_RETRIES;
- this.inQueue = new LinkedBlockingQueue<MessageWrapper>(QUEUE_SIZE);
- this.address = address;
- this.poller = poller;
- createSocket();
- }
-
- public ZMQ.Socket getSocket() {
- return socket;
- }
-
- public String getAddress() {
- return address;
- }
-
- public int getRetriesLeft() {
- return retriesLeft;
- }
-
- public void setRetriesLeft(int retriesLeft) {
- this.retriesLeft = retriesLeft;
- }
-
- public SocketState getState() {
- return state;
- }
-
- public void setState(SocketState state) {
- this.state = state;
- }
-
- public int getQueueSize() {
- return inQueue.size();
- }
-
- public MessageWrapper removeCurrentRequest() {
- return inQueue.poll();
- }
-
- public boolean hasTimedOut() {
- return (System.currentTimeMillis() - sendTime > RpcSocket.TIMEOUT);
- }
-
- public void send(MessageWrapper request) throws TimeoutException {
- try {
- boolean success = inQueue.offer(request, TIMEOUT, TimeUnit.MILLISECONDS);
- if (!success) {
- throw new TimeoutException("send :: Queue is full");
- }
- process();
- }
- catch (InterruptedException e) {
- log.error("send : Thread interrupted while attempting to add request to inQueue", e);
- }
- }
-
- public MessageWrapper receive() {
- Message response = parseMessage();
- MessageWrapper messageWrapper = inQueue.poll(); //remove the message from queue
- MessageWrapper responseMessageWrapper = new MessageWrapper(response, messageWrapper.getReceiveSocket());
-
- state = new IdleSocketState();
- retriesLeft = NUM_RETRIES;
- return responseMessageWrapper;
- }
-
- public void process() {
- if (getQueueSize() > 0) //process if there's message in the queue
- state.process(this);
- }
-
- // Called by IdleSocketState & BusySocketState
- public void sendMessage() {
- //Get the message from queue without removing it. For retries
- MessageWrapper messageWrapper = inQueue.peek();
- if (messageWrapper != null) {
- Message message = messageWrapper.getMessage();
- try {
- socket.send(Message.serialize(message));
- }
- catch (IOException e) {
- log.debug("Message send failed [{}]", message);
- log.debug("Exception [{}]", e);
- }
- sendTime = System.currentTimeMillis();
- }
- }
-
- public Message parseMessage() {
- Message parsedMessage = null;
- byte[] bytes = socket.recv();
- log.debug("Received bytes:[{}]", bytes.length);
- try {
- parsedMessage = (Message)Message.deserialize(bytes);
- }
- catch (IOException|ClassNotFoundException e) {
- log.debug("parseMessage : Deserializing received bytes failed", e);
- }
-
- return parsedMessage;
- }
-
- public void recycleSocket() {
- close();
- }
-
- public void close() {
- socket.setLinger(10);
- socket.close();
- }
-
- private void createSocket() {
- socket = Context.getInstance().getZmqContext().socket(ZMQ.REQ);
- socket.connect(address);
- poller.register(socket, ZMQ.Poller.POLLIN);
- state = new IdleSocketState();
- }
-
-
- /**
- * Represents the state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
- */
- public static interface SocketState {
-
- /* The processing actions to be performed in this state
- */
- public void process(RpcSocket socket);
- }
-
- /**
- * Represents the idle state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
- */
- public static class IdleSocketState implements SocketState {
-
- @Override
- public void process(RpcSocket socket) {
- socket.sendMessage();
- socket.setState(new BusySocketState());
- socket.setRetriesLeft(socket.getRetriesLeft()-1);
- }
- }
-
- /**
- * Represents the busy state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket}
- */
- public static class BusySocketState implements SocketState {
-
- private static Logger log = LoggerFactory.getLogger(BusySocketState.class);
-
- @Override
- public void process(RpcSocket socket) {
- if (socket.hasTimedOut()) {
- if (socket.getRetriesLeft() > 0) {
- log.debug("process : Request timed out, retrying now...");
- socket.sendMessage();
- socket.setRetriesLeft(socket.getRetriesLeft() - 1);
- }
- else {
- // No more retries for current request, so stop processing the current request
- MessageWrapper message = socket.removeCurrentRequest();
- if (message != null) {
- log.error("Unable to process rpc request [{}]", message);
- socket.setState(new IdleSocketState());
- socket.setRetriesLeft(NUM_RETRIES);
- }
- }
- }
- // Else no timeout, so allow processing to continue
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-
-import java.io.IOException;
-import java.util.concurrent.TimeoutException;
-
-import static com.google.common.base.Preconditions.*;
-
-/**
- * Main server thread for sending requests.
- */
-public class Sender implements Runnable{
-
- private final static Logger _logger = LoggerFactory.getLogger(Sender.class);
- private final Client client;
-
-
-
-
- public Sender(Client client) {
- super();
- this.client = client;
- }
-
-@Override
- public void run() {
- _logger.info("Starting...");
-
- try (SocketManager socketManager = new SocketManager()){
- while (!Thread.currentThread().isInterrupted()) {
-
- //read incoming messages from blocking queue
- MessageWrapper request = pollForRequest();
-
- if (request != null) {
- processRequest(socketManager, request);
- }
-
- flushSockets(socketManager);
- pollForResponse(socketManager);
- processResponse(socketManager);
-
- }
- } catch(Exception t){
- _logger.error("Exception: [{}]", t);
- _logger.error("Stopping...");
- }
- }
-
- private void processResponse(SocketManager socketManager) {
- for (int i = 0; i < socketManager.getPoller().getSize(); i++) {
- // If any sockets get a response, process it
- if (socketManager.getPoller().pollin(i)) {
- Optional<RpcSocket> socket = socketManager.getManagedSocketFor(
- socketManager.getPoller().getItem(i).getSocket());
-
- checkState(socket.isPresent(), "Managed socket not found");
-
- MessageWrapper response = socket.get().receive();
- _logger.debug("Received rpc response [{}]", response.getMessage());
-
- //TODO: handle exception and introduce timeout on receiver side
- try {
- response.getReceiveSocket().send(Message.serialize(response.getMessage()));
- } catch (IOException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
-
- private void processRequest(SocketManager socketManager, MessageWrapper request) throws TimeoutException {
-
- if ((request.getMessage() == null) ||
- (request.getMessage().getRecipient() == null)) {
- //invalid message. log and drop
- _logger.error("Invalid request [{}]", request);
- return;
- }
-
- RpcSocket socket =
- socketManager.getManagedSocket(request.getMessage().getRecipient());
-
- socket.send(request);
- }
-
- private void flushSockets(SocketManager socketManager){
- for (RpcSocket socket : socketManager.getManagedSockets()){
- socket.process();
- }
- }
-
- private MessageWrapper pollForRequest(){
- return client.getRequestQueue().poll();
- }
-
- private void pollForResponse(SocketManager socketManager){
- try{
- socketManager.getPoller().poll(10); //poll every 10ms
- }catch (Throwable t) { /*Ignore and continue*/ }
- }
-}
-
-
-/*
-SCALA
-
-package org.opendaylight.controller.sal.connector.remoterpc
-
- import org.slf4j.{LoggerFactory, Logger}
- import scala.collection.JavaConverters._
- import scala.Some
- import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, Message}
-*/
-/**
- * Main server thread for sending requests. This does not maintain any state. If the
- * thread dies, it will be restarted
- */
-/*class Sender extends Runnable {
- private val _logger: Logger = LoggerFactory.getLogger(Sender.this.getClass())
-
- override def run = {
- _logger.info("Sender starting...")
- val socketManager = new SocketManager()
-
- try {
- while (!Thread.currentThread().isInterrupted) {
- //read incoming messages from blocking queue
- val request: MessageWrapper = Client.requestQueue.poll()
-
- if (request != null) {
- if ((request.message != null) &&
- (request.message.getRecipient != null)) {
-
- val socket = socketManager.getManagedSocket(request.message.getRecipient)
- socket.send(request)
- } else {
- //invalid message. log and drop
- _logger.error("Invalid request [{}]", request)
- }
- }
-
- socketManager.getManagedSockets().asScala.map(s => s.process)
-
- // Poll all sockets for responses every 1 sec
- poll(socketManager)
-
- // If any sockets get a response, process it
- for (i <- 0 until socketManager.poller.getSize) {
- if (socketManager.poller.pollin(i)) {
- val socket = socketManager.getManagedSocketFor(socketManager.poller.getItem(i).getSocket)
-
- socket match {
- case None => //{
- _logger.error("Could not find a managed socket for zmq socket")
- throw new IllegalStateException("Could not find a managed socket for zmq socket")
- //}
- case Some(s) => {
- val response = s.receive()
- _logger.debug("Received rpc response [{}]", response.message)
- response.receiveSocket.send(Message.serialize(response.message))
- }
- }
- }
- }
-
- }
- } catch{
- case e:Exception => {
- _logger.debug("Sender stopping due to exception")
- e.printStackTrace()
- }
- } finally {
- socketManager.stop
- }
- }
-
- def poll(socketManager:SocketManager) = {
- try{
- socketManager.poller.poll(10)
- }catch{
- case t:Throwable => //ignore and continue
- }
- }
-}
-
-
-// def newThread(r: Runnable): Thread = {
-// val t = new RequestHandler()
-// t.setUncaughtExceptionHandler(new RequestProcessorExceptionHandler)
-// t
-// }
-
-
-
-/**
- * Restarts the request processing server in the event of unforeseen exceptions
- */
-//private class RequestProcessorExceptionHandler extends UncaughtExceptionHandler {
-// def uncaughtException(t: Thread, e: Throwable) = {
-// _logger.error("Exception caught during request processing [{}]", e)
-// _logger.info("Restarting request processor server...")
-// RequestProcessor.start()
-// }
package org.opendaylight.controller.sal.connector.remoterpc;
import com.google.common.base.Optional;
-
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Sets;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message.MessageType;
import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
-import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
-import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
/**
- * ZeroMq based implementation of RpcRouter TODO: 1. Make rpc request handling
- * async and non-blocking. Note zmq socket is not thread safe 2. Read properties
- * from config file using existing(?) ODL properties framework
+ * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
+ * so that it gets route change notifications from routing table.
*/
-public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, Set> {
+public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
- private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
+ private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
- private ExecutorService serverPool;
+ private ExecutorService serverPool;
+ protected ServerRequestHandler handler;
- // private RoutingTable<RpcRouter.RouteIdentifier, String> routingTable;
- private RoutingTableProvider routingTable;
- private Set<QName> remoteServices;
- private ProviderSession brokerSession;
- private ZMQ.Context context;
- private ZMQ.Socket replySocket;
+ private Set<QName> remoteServices;
+ private ProviderSession brokerSession;
+ private ZMQ.Context context;
- private final RpcListener listener = new RpcListener();
+ private final RpcListener listener = new RpcListener();
- private final String localUri = Context.getInstance().getLocalUri();
+ private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
+ private final int HANDLER_WORKER_COUNT = 2;
+ private final int HWM = 200;//high water mark on sockets
+ private volatile State status = State.STOPPED;
- private final int rpcPort;
+ private String serverAddress;
+ private int port;
- private RpcImplementation client;
+ private ClientImpl client;
- public RpcImplementation getClient() {
- return client;
- }
+ private RoutingTableProvider routingTableProvider;
- public void setClient(RpcImplementation client) {
- this.client = client;
- }
+ public static enum State {
+ STARTING, STARTED, STOPPED;
+ }
- // Prevent instantiation
- public ServerImpl(int rpcPort) {
- this.rpcPort = rpcPort;
- }
+ public ServerImpl(int port) {
+ this.port = port;
+ this.serverAddress = new StringBuilder(findIpAddress()).
+ append(":").
+ append(port).
+ toString();
+ }
- public void setBrokerSession(ProviderSession session) {
- this.brokerSession = session;
- }
+ public RoutingTableProvider getRoutingTableProvider() {
+ return routingTableProvider;
+ }
- public ExecutorService getServerPool() {
- return serverPool;
- }
+ public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
+ this.routingTableProvider = routingTableProvider;
+ }
- public void setServerPool(ExecutorService serverPool) {
- this.serverPool = serverPool;
- }
+ public ClientImpl getClient(){
+ return this.client;
+ }
- public void start() {
- context = ZMQ.context(1);
- serverPool = Executors.newSingleThreadExecutor();
- remoteServices = new HashSet<QName>();
+ public void setClient(ClientImpl client) {
+ this.client = client;
+ }
- // Start listening rpc requests
- serverPool.execute(receive());
+ public State getStatus() {
+ return this.status;
+ }
- brokerSession.addRpcRegistrationListener(listener);
- // routingTable.registerRouteChangeListener(routeChangeListener);
+ public Optional<ServerRequestHandler> getHandler() {
+ return Optional.fromNullable(this.handler);
+ }
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- listener.onRpcImplementationAdded(rpc);
- }
+ public void setBrokerSession(ProviderSession session) {
+ this.brokerSession = session;
+ }
- _logger.debug("RPC Server started [{}]", localUri);
- }
+ public Optional<ProviderSession> getBrokerSession() {
+ return Optional.fromNullable(this.brokerSession);
+ }
- public void stop() {
- // TODO: un-subscribe
+ public Optional<ZMQ.Context> getZmqContext() {
+ return Optional.fromNullable(this.context);
+ }
- // if (context != null)
- // context.term();
- //
- // _logger.debug("ZMQ Context is terminated.");
+ public String getServerAddress() {
+ return serverAddress;
+ }
- if (serverPool != null)
- serverPool.shutdown();
+ public String getHandlerAddress() {
+ return HANDLER_INPROC_ADDRESS;
+ }
- _logger.debug("Thread pool is closed.");
- }
+ /**
+ *
+ */
+ public void start() {
+ Preconditions.checkState(State.STOPPED == this.getStatus(),
+ "Remote RPC Server is already running");
- private Runnable receive() {
- return new Runnable() {
- public void run() {
-
- // Bind to RPC reply socket
- replySocket = context.socket(ZMQ.REP);
- replySocket.bind("tcp://*:" + Context.getInstance().getRpcPort());
-
- // Poller enables listening on multiple sockets using a single
- // thread
- ZMQ.Poller poller = new ZMQ.Poller(1);
- poller.register(replySocket, ZMQ.Poller.POLLIN);
- try {
- // TODO: Add code to restart the thread after exception
- while (!Thread.currentThread().isInterrupted()) {
-
- poller.poll();
-
- if (poller.pollin(0)) {
- handleRpcCall();
- }
- }
- } catch (Exception e) {
- // log and continue
- _logger.error("Unhandled exception [{}]", e);
- } finally {
- poller.unregister(replySocket);
- replySocket.close();
- }
-
- }
- };
- }
+ status = State.STARTING;
+ context = ZMQ.context(1);
+ remoteServices = new HashSet<QName>();//
+ serverPool = Executors.newSingleThreadExecutor();//main server thread
+ serverPool.execute(receive()); // Start listening rpc requests
+ brokerSession.addRpcRegistrationListener(listener);
- /**
- * @throws InterruptedException
- * @throws ExecutionException
- */
- private void handleRpcCall() {
+ announceLocalRpcs();
- Message request = parseMessage(replySocket);
+ registerRemoteRpcs();
- _logger.debug("Received rpc request [{}]", request);
+ status = State.STARTED;
+ _logger.info("Remote RPC Server started [{}]", getServerAddress());
+ }
- // Call broker to process the message then reply
- Future<RpcResult<CompositeNode>> rpc = null;
- RpcResult<CompositeNode> result = null;
- try {
- rpc = brokerSession.rpc((QName) request.getRoute().getType(),
- XmlUtils.xmlToCompositeNode((String) request.getPayload()));
+ public void stop(){
+ close();
+ }
- result = (rpc != null) ? rpc.get() : null;
+ /**
+ *
+ */
+ @Override
+ public void close() {
- } catch (Exception e) {
- _logger.debug("Broker threw [{}]", e);
- }
+ if (State.STOPPED == this.getStatus()) return; //do nothing
+
+ unregisterLocalRpcs();
- CompositeNode payload = (result != null) ? result.getResult() : null;
+ if (serverPool != null)
+ serverPool.shutdown();
- Message response = new Message.MessageBuilder().type(MessageType.RESPONSE).sender(localUri)
- .route(request.getRoute()).payload(XmlUtils.compositeNodeToXml(payload)).build();
+ closeZmqContext();
- _logger.debug("Sending rpc response [{}]", response);
+ status = State.STOPPED;
+ _logger.info("Remote RPC Server stopped");
+ }
+
+ /**
+ * Closes ZMQ Context. It tries to gracefully terminate the context. If
+ * termination takes more than a second, its forcefully shutdown.
+ */
+ private void closeZmqContext() {
+ ExecutorService exec = Executors.newSingleThreadExecutor();
+ FutureTask zmqTermination = new FutureTask(new Runnable() {
+ @Override
+ public void run() {
try {
- replySocket.send(Message.serialize(response));
+ if (context != null)
+ context.term();
+ _logger.debug("ZMQ Context terminated gracefully!");
} catch (Exception e) {
- _logger.debug("rpc response send failed for message [{}]", response);
- _logger.debug("{}", e);
+ _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
}
+ }
+ }, null);
+
+ exec.execute(zmqTermination);
+
+ try {
+ zmqTermination.get(5L, TimeUnit.SECONDS);
+ } catch (Exception e) {/*ignore and continue with shutdown*/}
+
+ exec.shutdownNow();
+ }
+
+ /**
+ * Main listener thread that spawns {@link ServerRequestHandler} as workers.
+ *
+ * @return
+ */
+ private Runnable receive() {
+ return new Runnable() {
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("remote-rpc-server");
+ _logger.debug("Remote RPC Server main thread starting...");
+
+ //socket clients connect to (frontend)
+ ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
+
+ //socket RequestHandlers connect to (backend)
+ ZMQ.Socket workers = context.socket(ZMQ.DEALER);
+
+ try (SocketPair capturePair = new SocketPair();
+ ServerRequestHandler requestHandler = new ServerRequestHandler(context,
+ brokerSession,
+ HANDLER_WORKER_COUNT,
+ HANDLER_INPROC_ADDRESS,
+ getServerAddress());) {
+
+ handler = requestHandler;
+ clients.setHWM(HWM);
+ clients.bind("tcp://*:" + port);
+ workers.setHWM(HWM);
+ workers.bind(HANDLER_INPROC_ADDRESS);
+ //start worker threads
+ _logger.debug("Remote RPC Server worker threads starting...");
+ requestHandler.start();
+ //start capture thread
+ // handlerPool.execute(new CaptureHandler(capturePair.getReceiver()));
+ // Connect work threads to client threads via a queue
+ ZMQ.proxy(clients, workers, null);//capturePair.getSender());
+ } catch (Exception e) {
+ _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
+ } finally {
+ if (clients != null) clients.close();
+ if (workers != null) workers.close();
+ _logger.info("Remote RPC Server stopped");
+ }
+ }
+ };
+ }
+
+ /**
+ * Register the remote RPCs from the routing table into broker
+ */
+ private void registerRemoteRpcs(){
+ Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
+
+ Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
+
+ Set<Map.Entry> remoteRoutes =
+ routingTableProvider.getRoutingTable().get().getAllRoutes();
+
+ //filter out all entries that contains local address
+ //we dont want to register local RPCs as remote
+ Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
+ public boolean apply(Map.Entry remoteRoute){
+ return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
+ }
+ };
+
+ //filter the entries created by current node
+ Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
+
+ for (Map.Entry route : filteredRemoteRoutes){
+ onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
+ }
+ }
+
+ /**
+ * Un-Register the local RPCs from the routing table
+ */
+ private void unregisterLocalRpcs(){
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationRemoved(rpc);
+ }
+ }
+
+ /**
+ * Publish all the locally registered RPCs in the routing table
+ */
+ private void announceLocalRpcs(){
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationAdded(rpc);
+ }
+ }
+
+ /**
+ * @param key
+ * @param value
+ */
+ @Override
+ public void onRouteUpdated(String key, String value) {
+ RouteIdentifierImpl rId = new RouteIdentifierImpl();
+ try {
+ _logger.debug("Updating key/value {}-{}", key, value);
+ brokerSession.addRpcImplementation(
+ (QName) rId.fromString(key).getType(), client);
+
+ //TODO: Check with Tony for routed rpc
+ //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
+ } catch (Exception e) {
+ _logger.info("Route update failed {}", e);
}
+ }
+
+ /**
+ * @param key
+ */
+ @Override
+ public void onRouteDeleted(String key) {
+ //TODO: Broker session needs to be updated to support this
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Finds IPv4 address of the local VM
+ * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
+ * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
+ * Should we use IP or hostname?
+ *
+ * @return
+ */
+ private String findIpAddress() {
+ String hostAddress = null;
+ Enumeration e = null;
+ try {
+ e = NetworkInterface.getNetworkInterfaces();
+ } catch (SocketException e1) {
+ e1.printStackTrace();
+ }
+ while (e.hasMoreElements()) {
- /**
- * @param socket
- * @return
- */
- private Message parseMessage(ZMQ.Socket socket) {
+ NetworkInterface n = (NetworkInterface) e.nextElement();
- Message msg = null;
- try {
- byte[] bytes = socket.recv();
- _logger.debug("Received bytes:[{}]", bytes.length);
- msg = (Message) Message.deserialize(bytes);
- } catch (Throwable t) {
- t.printStackTrace();
- }
- return msg;
+ Enumeration ee = n.getInetAddresses();
+ while (ee.hasMoreElements()) {
+ InetAddress i = (InetAddress) ee.nextElement();
+ if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
+ hostAddress = i.getHostAddress();
+ }
}
+ return hostAddress;
+
+ }
+
+ /**
+ * Listener for rpc registrations
+ */
+ private class RpcListener implements RpcRegistrationListener {
@Override
- public void onRouteUpdated(String key, Set values) {
- RouteIdentifierImpl rId = new RouteIdentifierImpl();
- try {
- _logger.debug("Updating key/value {}-{}", key, values);
- brokerSession.addRpcImplementation((QName) rId.fromString(key).getType(), client);
+ public void onRpcImplementationAdded(QName name) {
- } catch (Exception e) {
- _logger.info("Route update failed {}", e);
- }
+ //if the service name exists in the set, this notice
+ //has bounced back from the broker. It should be ignored
+ if (remoteServices.contains(name))
+ return;
+
+ _logger.debug("Adding registration for [{}]", name);
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(name);
+
+ RoutingTable<String, String> routingTable = getRoutingTable();
+
+ try {
+ routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
+ _logger.debug("Route added [{}-{}]", name, getServerAddress());
+
+ } catch (RoutingTableException | SystemException e) {
+ //TODO: This can be thrown when route already exists in the table. Broker
+ //needs to handle this.
+ _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
+
+ }
}
@Override
- public void onRouteDeleted(String key) {
- // TODO: Broker session needs to be updated to support this
- throw new UnsupportedOperationException();
+ public void onRpcImplementationRemoved(QName name) {
+
+ _logger.debug("Removing registration for [{}]", name);
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(name);
+
+ RoutingTable<String, String> routingTable = getRoutingTable();
+
+ try {
+ routingTable.removeGlobalRoute(routeId.toString());
+ } catch (RoutingTableException | SystemException e) {
+ _logger.error("Route delete failed {}", e);
+ }
}
-
- /**
- * Listener for rpc registrations
- */
- private class RpcListener implements RpcRegistrationListener {
-
-
-
- @Override
- public void onRpcImplementationAdded(QName name) {
-
- // if the service name exists in the set, this notice
- // has bounced back from the broker. It should be ignored
- if (remoteServices.contains(name))
- return;
-
- _logger.debug("Adding registration for [{}]", name);
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setType(name);
-
- try {
- routingTable.getRoutingTable().get().addGlobalRoute(routeId.toString(), localUri);
- _logger.debug("Route added [{}-{}]", name, localUri);
- } catch (RoutingTableException | SystemException e) {
- // TODO: This can be thrown when route already exists in the
- // table. Broker
- // needs to handle this.
- _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
-
- }
- }
- @Override
- public void onRpcImplementationRemoved(QName name) {
+ private RoutingTable<String, String> getRoutingTable(){
+ Optional<RoutingTable<String, String>> routingTable =
+ routingTableProvider.getRoutingTable();
- _logger.debug("Removing registration for [{}]", name);
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setType(name);
+ checkNotNull(routingTable.isPresent(), "Routing table is null");
- try {
- routingTable.getRoutingTable().get().removeGlobalRoute(routeId.toString());
- } catch (RoutingTableException | SystemException e) {
- _logger.error("Route delete failed {}", e);
- }
- }
+ return routingTable.get();
}
+ }
+
+ /*
+ * Listener for Route changes in broker. Broker notifies this listener in the event
+ * of any change (add/delete). Listener then updates the routing table.
+ */
+ private class BrokerRouteChangeListener
+ implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
@Override
- public void close() throws Exception {
- stop();
- }
+ public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
- public void setRoutingTableProvider(RoutingTableProvider provider) {
- this.routingTable = provider;
}
+ }
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *
+ */
+public class ServerRequestHandler implements AutoCloseable{
+
+ private Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class);
+ private final String DEFAULT_NAME = "remote-rpc-worker";
+ private String dealerAddress;
+ private String serverAddress;
+ private int workerCount;
+ private ZMQ.Context context;
+ private Broker.ProviderSession broker;
+
+ private RequestHandlerThreadPool workerPool;
+ private final AtomicInteger threadId = new AtomicInteger();
+
+ public ServerRequestHandler(ZMQ.Context context,
+ Broker.ProviderSession session,
+ int workerCount,
+ String dealerAddress,
+ String serverAddress) {
+ this.context = context;
+ this.dealerAddress = dealerAddress;
+ this.serverAddress = serverAddress;
+ this.broker = session;
+ this.workerCount = workerCount;
+ }
+
+ public ThreadPoolExecutor getWorkerPool(){
+ return workerPool;
+ }
+
+ public void start(){
+ workerPool = new RequestHandlerThreadPool(
+ workerCount, workerCount,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ //unbound is ok. Task will never be submitted
+
+ for (int i=0;i<workerCount;i++){
+ workerPool.execute(new Worker(threadId.incrementAndGet()));
+ }
+ }
+
+ /**
+ * This gets called automatically if used with try-with-resources
+ * @throws Exception
+ */
+ @Override
+ public void close() throws Exception {
+ if (workerPool != null)
+ workerPool.shutdown();
+ _logger.info("Request Handler closed");
+ }
+
+ /**
+ * Worker to handles RPC request
+ */
+ private class Worker implements Runnable {
+ private String name;
+
+ public Worker(int id){
+ this.name = DEFAULT_NAME + "-" + id;
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName(name);
+ _logger.debug("Starting... ");
+ ZMQ.Socket socket = null;
+
+ try {
+ socket = context.socket(ZMQ.REP);
+ socket.connect(dealerAddress);
+
+ while (!Thread.currentThread().isInterrupted()) {
+
+ Message request = parseMessage(socket);
+ _logger.debug("Received rpc request [{}]", request);
+
+ if (request != null) {
+ // Call broker to process the message then reply
+ Future<RpcResult<CompositeNode>> rpc = null;
+ RpcResult<CompositeNode> result = null;
+
+ //TODO Call this in a new thread with timeout
+ try {
+ rpc = broker.rpc(
+ (QName) request.getRoute().getType(),
+ XmlUtils.xmlToCompositeNode((String) request.getPayload()));
+
+ result = (rpc != null) ? rpc.get() : null;
+
+ } catch (Exception e) {
+ _logger.debug("Broker threw [{}]", e);
+ }
+
+ CompositeNode payload = (result != null) ? result.getResult() : null;
+
+ Message response = new Message.MessageBuilder()
+ .type(Message.MessageType.RESPONSE)
+ .sender(serverAddress)
+ .route(request.getRoute())
+ .payload(XmlUtils.compositeNodeToXml(payload))
+ .build();
+
+ _logger.debug("Sending rpc response [{}]", response);
+
+ try {
+ socket.send(Message.serialize(response));
+ } catch (Exception e) {
+ _logger.debug("rpc response send failed for message [{}]", response);
+ _logger.debug("{}", e);
+ }
+ }
+ }
+ } catch (Exception e) {
+ printException(e);
+ } finally {
+ closeSocket(socket);
+ }
+ }
+
+ /**
+ * @param socket
+ * @return
+ */
+ private Message parseMessage(ZMQ.Socket socket) throws Exception {
+ byte[] bytes = socket.recv(); //this blocks
+ _logger.debug("Received bytes:[{}]", bytes.length);
+ return (Message) Message.deserialize(bytes);
+ }
+
+ private void printException(Exception e) {
+ try (StringWriter s = new StringWriter();
+ PrintWriter p = new PrintWriter(s)) {
+ e.printStackTrace(p);
+ _logger.debug(s.toString());
+ } catch (IOException e1) {/*Ignore and continue*/ }
+ }
+
+ private void closeSocket(ZMQ.Socket socket) {
+ try {
+ if (socket != null) socket.close();
+ } catch (Exception x) {
+ _logger.debug("Exception while closing socket [{}]", x);
+ } finally {
+ if (socket != null) socket.close();
+ }
+ _logger.debug("Closing...");
+ }
+ }
+
+
+ /**
+ *
+ */
+ public class RequestHandlerThreadPool extends ThreadPoolExecutor{
+
+ public RequestHandlerThreadPool(int corePoolSize,
+ int maximumPoolSize,
+ long keepAliveTime,
+ TimeUnit unit,
+ BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ if (isTerminating() || isTerminated() || isShutdown())
+ return;
+
+ if ( t != null ){
+ _logger.debug("Exception caught while terminating worker [{},{}]", t.getClass(), t.getMessage());
+ }
+
+ this.execute(new Worker(threadId.incrementAndGet()));
+ super.afterExecute(r, null);
+ }
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import com.google.common.base.Optional;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-import java.net.UnknownHostException;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * Manages creation of {@link RpcSocket} and their registration with {@link ZMQ.Poller}
- */
-public class SocketManager implements AutoCloseable{
- private static final Logger log = LoggerFactory.getLogger(SocketManager.class);
-
- /*
- * RpcSockets mapped by network address its connected to
- */
- private ConcurrentHashMap<String, RpcSocket> managedSockets = new ConcurrentHashMap<String, RpcSocket>();
-
- private ZMQ.Poller _poller = new ZMQ.Poller(2); //randomly selected size. Poller grows automatically
-
- /**
- * Returns a {@link RpcSocket} for the given address
- * @param address network address with port eg: 10.199.199.20:5554
- * @return
- */
- public RpcSocket getManagedSocket(String address) throws IllegalArgumentException {
- //Precondition
- if (!address.matches("(tcp://)(.*)(:)(\\d*)")) {
- throw new IllegalArgumentException("Address must of format 'tcp://<ip address>:<port>' but is " + address);
- }
-
- if (!managedSockets.containsKey(address)) {
- log.debug("{} Creating new socket for {}", Thread.currentThread().getName());
- RpcSocket socket = new RpcSocket(address, _poller);
- managedSockets.put(address, socket);
- }
-
- return managedSockets.get(address);
- }
-
- /**
- * Returns a {@link RpcSocket} for the given {@link ZMQ.Socket}
- * @param socket
- * @return
- */
- public Optional<RpcSocket> getManagedSocketFor(ZMQ.Socket socket) {
- for (RpcSocket rpcSocket : managedSockets.values()) {
- if (rpcSocket.getSocket().equals(socket)) {
- return Optional.of(rpcSocket);
- }
- }
- return Optional.absent();
- }
-
- /**
- * Return a collection of all managed sockets
- * @return
- */
- public Collection<RpcSocket> getManagedSockets() {
- return managedSockets.values();
- }
-
- /**
- * Returns the {@link ZMQ.Poller}
- * @return
- */
- public ZMQ.Poller getPoller() {
- return _poller;
- }
-
- /**
- * This should be called when stopping the server to close all the sockets
- * @return
- */
- @Override
- public void close() throws Exception {
- log.debug("Stopping...");
- for (RpcSocket socket : managedSockets.values()) {
- socket.close();
- }
- managedSockets.clear();
- log.debug("Stopped");
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc.dto;
-
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.*;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class CompositeNodeImpl implements CompositeNode, Serializable {
-
- private QName key;
- private List<Node<?>> children;
-
- @Override
- public List<Node<?>> getChildren() {
- return children;
- }
-
- @Override
- public List<CompositeNode> getCompositesByName(QName children) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<CompositeNode> getCompositesByName(String children) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public List<SimpleNode<?>> getSimpleNodesByName(QName children) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public List<SimpleNode<?>> getSimpleNodesByName(String children) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public CompositeNode getFirstCompositeByName(QName container) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public SimpleNode<?> getFirstSimpleByName(QName leaf) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public MutableCompositeNode asMutable() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public QName getKey() {
- return key; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public List<Node<?>> setValue(List<Node<?>> value) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public int size() {
- return 0; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public boolean isEmpty() {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public boolean containsKey(Object key) {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public boolean containsValue(Object value) {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public List<Node<?>> get(Object key) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public List<Node<?>> put(QName key, List<Node<?>> value) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public List<Node<?>> remove(Object key) {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public void putAll(Map<? extends QName, ? extends List<Node<?>>> m) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public void clear() {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public Set<QName> keySet() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public Collection<List<Node<?>>> values() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public Set<Entry<QName, List<Node<?>>>> entrySet() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public QName getNodeType() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public CompositeNode getParent() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public List<Node<?>> getValue() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public ModifyAction getModificationAction() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.sal.connector.remoterpc.dto;
-
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-
-import java.io.Serializable;
-
-public class RpcRequestImpl implements RpcRouter.RpcRequest<QName, QName, InstanceIdentifier, Object>,Serializable {
-
- private RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier;
- private Object payload;
-
- @Override
- public RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> getRoutingInformation() {
- return routeIdentifier;
- }
-
- public void setRouteIdentifier(RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier) {
- this.routeIdentifier = routeIdentifier;
- }
-
- @Override
- public Object getPayload() {
- return payload;
- }
-
- public void setPayload(Object payload) {
- this.payload = payload;
- }
-
-}
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.sal.connector.remoterpc.util;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc
-
-import org.opendaylight.yangtools.yang.data.api.CompositeNode
-import org.opendaylight.yangtools.yang.common.{RpcError, RpcResult, QName}
-import org.opendaylight.controller.sal.core.api.RpcImplementation
-import java.util
-import java.util.{UUID, Collections}
-import org.zeromq.ZMQ
-import org.opendaylight.controller.sal.common.util.{RpcErrors, Rpcs}
-import org.slf4j.LoggerFactory
-import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, RouteIdentifierImpl, Message}
-import Message.MessageType
-import java.util.concurrent._
-import java.lang.InterruptedException
-
-
-/**
- * An implementation of {@link RpcImplementation} that makes
- * remote RPC calls
- */
-class Client extends RemoteRpcClient {
-
- private val _logger = LoggerFactory.getLogger(this.getClass);
-
- val requestQueue = new LinkedBlockingQueue[MessageWrapper](100)
- val pool: ExecutorService = Executors.newSingleThreadExecutor()
- private val TIMEOUT = 5000 //in ms
- var routingTableProvider: RoutingTableProvider = null
-
-
- def getInstance = this
-
-
- def setRoutingTableProvider(provider : RoutingTableProvider) = {
- routingTableProvider = provider;
- }
-
- def getSupportedRpcs: util.Set[QName] = {
- Collections.emptySet()
- }
-
- def invokeRpc(rpc: QName, input: CompositeNode): RpcResult[CompositeNode] = {
-
- val routeId = new RouteIdentifierImpl()
- routeId.setType(rpc)
-
- //lookup address for the rpc request
- val routingTable = routingTableProvider.getRoutingTable()
- require( routingTable != null, "Routing table not found. Exiting" )
-
- val addresses:util.Set[String] = routingTable.getRoutes(routeId.toString)
- require(addresses != null, "Address not found for rpc " + rpc);
- require(addresses.size() == 1) //its a global service.
-
- val address = addresses.iterator().next()
- require(address != null, "Address is null")
-
- //create in-process "pair" socket and pass it to sender thread
- //Sender replies on this when result is available
- val inProcAddress = "inproc://" + UUID.randomUUID()
- val receiver = Context.zmqContext.socket(ZMQ.PAIR)
- receiver.bind(inProcAddress);
-
- val sender = Context.zmqContext.socket(ZMQ.PAIR)
- sender.connect(inProcAddress)
-
- val requestMessage = new Message.MessageBuilder()
- .`type`(MessageType.REQUEST)
- //.sender("tcp://localhost:8081")
- .recipient(address)
- .route(routeId)
- .payload(input)
- .build()
-
- _logger.debug("Queuing up request and expecting response on [{}]", inProcAddress)
-
- val messageWrapper = new MessageWrapper(requestMessage, sender)
- val errors = new util.ArrayList[RpcError]
-
- try {
- process(messageWrapper)
- val response = parseMessage(receiver)
-
- return Rpcs.getRpcResult(
- true, response.getPayload.asInstanceOf[CompositeNode], Collections.emptySet())
-
- } catch {
- case e: Exception => {
- errors.add(RpcErrors.getRpcError(null,null,null,null,e.getMessage,null,e.getCause))
- return Rpcs.getRpcResult(false, null, errors)
- }
- } finally {
- receiver.close();
- sender.close();
- }
-
- }
-
- /**
- * Block on socket for reply
- * @param receiver
- * @return
- */
- private def parseMessage(receiver:ZMQ.Socket): Message = {
- val bytes = receiver.recv()
- return Message.deserialize(bytes).asInstanceOf[Message]
- }
-
- def start() = {
- pool.execute(new Sender)
- }
-
- def process(msg: MessageWrapper) = {
- _logger.debug("Processing message [{}]", msg)
- val success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS)
-
- if (!success) throw new TimeoutException("Queue is full");
-
- }
-
- def stop() = {
- pool.shutdown() //intiate shutdown
- _logger.debug("Client stopping...")
- // Context.zmqContext.term();
- // _logger.debug("ZMQ context terminated")
-
- try {
-
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow();
- if (!pool.awaitTermination(10, TimeUnit.SECONDS))
- _logger.error("Client thread pool did not shut down");
- }
- } catch {
- case ie:InterruptedException =>
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- _logger.debug("Client stopped")
- }
-
- def close() = {
- stop();
- }
-}
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import com.google.common.base.Optional;
+import junit.framework.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+
+import java.io.IOException;
+import java.util.*;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ *
+ */
+public class ClientImplTest {
+ RoutingTableProvider routingTableProvider;
+ ClientImpl client;
+ ClientRequestHandler mockHandler;
+
+ @Before
+ public void setUp() throws Exception {
+
+ //mock routing table
+ routingTableProvider = mock(RoutingTableProvider.class);
+ RoutingTable<String, String> mockRoutingTable = new MockRoutingTable<String, String>();
+ Optional<RoutingTable<String, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
+ when(routingTableProvider.getRoutingTable()).thenReturn(optionalRoutingTable);
+
+ //mock ClientRequestHandler
+ mockHandler = mock(ClientRequestHandler.class);
+
+ client = new ClientImpl(mockHandler);
+ client.setRoutingTableProvider(routingTableProvider);
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ }
+
+ @Test
+ public void getRoutingTableProvider_Call_ShouldReturnMockProvider() throws Exception {
+ Assert.assertEquals(routingTableProvider, client.getRoutingTableProvider());
+
+ }
+
+ @Test
+ public void testStart() throws Exception {
+
+ }
+
+ @Test
+ public void testStop() throws Exception {
+
+ }
+
+ @Test
+ public void testClose() throws Exception {
+
+ }
+
+ @Test
+ public void invokeRpc_NormalCall_ShouldReturnSuccess() throws Exception {
+
+ when(mockHandler.handle(any(Message.class))).
+ thenReturn(MessagingUtil.createEmptyMessage());
+
+ RpcResult<CompositeNode> result = client.invokeRpc(null, null);
+
+ Assert.assertTrue(result.isSuccessful());
+ Assert.assertTrue(result.getErrors().isEmpty());
+ Assert.assertNull(result.getResult());
+ }
+
+ @Test
+ public void invokeRpc_HandlerThrowsException_ShouldReturnError() throws Exception {
+
+ when(mockHandler.handle(any(Message.class))).
+ thenThrow(new IOException());
+
+ RpcResult<CompositeNode> result = client.invokeRpc(null, null);
+
+ Assert.assertFalse(result.isSuccessful());
+ Assert.assertFalse(result.getErrors().isEmpty());
+ Assert.assertNull(result.getResult());
+ }
+
+}
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import junit.framework.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.util.concurrent.*;
+
+/**
+ *
+ */
+public class ClientRequestHandlerTest {
+
+ private final Logger _logger = LoggerFactory.getLogger(ClientRequestHandlerTest.class);
+
+ ZMQ.Context context;
+ ExecutorService serverThread;
+ final String SERVER_ADDRESS = "localhost:5554";
+
+ ClientRequestHandler handler;
+
+ @Before
+ public void setUp() throws Exception {
+ context = ZMQ.context(1);
+ serverThread = Executors.newCachedThreadPool();
+ handler = new ClientRequestHandler(context);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ serverThread.shutdown();
+ MessagingUtil.closeZmqContext(context);
+ handler.close();
+ }
+
+ @Test
+ public void handle_SingleRemote_ShouldReturnResponse() throws Exception {
+ serverThread.execute(MessagingUtil.startReplyServer(context, SERVER_ADDRESS, 1));
+ Message request = new Message();
+ request.setRecipient(SERVER_ADDRESS);
+ Message response = handleMessageWithTimeout(request);
+ Assert.assertNotNull(response);
+ //should be connected to only 1 remote server
+ Assert.assertEquals(1, handler.getWorkerCount());
+ Assert.assertEquals(response.getRecipient(), SERVER_ADDRESS);
+ }
+
+ // @Test
+ public void handle_MultiRemote_ShouldReturnResponses() throws Exception {
+ ExecutorService threadPool = Executors.newCachedThreadPool();
+ final int port = 5555;
+ String serverAddress = null;
+ for (int i = 0; i < 5; i++) {
+ serverAddress = "localhost:" + (port + i);
+ serverThread.execute(MessagingUtil.startReplyServer(context, serverAddress, 1));
+ threadPool.execute(createEmptyMessageTaskAndHandle(handler, serverAddress));
+ }
+ Thread.currentThread().sleep(5000);//wait for all messages to get processed
+ //should be connected to 5 remote server
+ Assert.assertEquals(5, handler.getWorkerCount());
+ }
+
+ private Runnable createEmptyMessageTaskAndHandle(final ClientRequestHandler handler, final String serverAddress) {
+
+ return new Runnable() {
+ @Override
+ public void run() {
+ Message request = new Message();
+ request.setRecipient(serverAddress);
+ try {
+ Message response = handleMessageWithTimeout(request);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(response.getRecipient(), serverAddress);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private Message handleMessageWithTimeout(final Message request) {
+ Message response = null;
+
+ FutureTask task = new FutureTask(new Callable<Message>() {
+
+ @Override
+ public Message call() {
+ try {
+ return handler.handle(request);
+ } catch (Exception e) {
+ _logger.debug("Client handler failed to handle request. Exception is [{}]", e);
+ }
+ return null;
+ }
+ });
+
+ serverThread.execute(task);
+
+ try {
+ response = (Message) task.get(5L, TimeUnit.SECONDS); //wait for max 5 sec for server to respond
+ } catch (Exception e) {/*ignore and continue*/}
+
+ return response;
+ }
+
+}
+++ /dev/null
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import junit.framework.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-
-import java.util.concurrent.TimeoutException;
-
-public class ClientTest {
-
- Client client;
-
- @Before
- public void setup(){
- client = new Client();
- client.getRequestQueue().clear();
- }
-
- @Test
- public void testStop() throws Exception {
-
- }
-
- @Test
- public void testPool() throws Exception {
-
- }
-
- @Test
- public void process_AddAMessage_ShouldAddToQueue() throws Exception {
- client.process(getEmptyMessageWrapper());
- Assert.assertEquals(1, client.getRequestQueue().size());
- }
-
- /**
- * Queue size is 100. Adding 101 message should time out in 2 sec
- * if server does not process it
- * @throws Exception
- */
- @Test(expected = TimeoutException.class)
- public void process_Add101Message_ShouldThrow() throws Exception {
- for (int i=0;i<101;i++){
- client.process(getEmptyMessageWrapper());
- }
- }
-
- @Test
- public void testStart() throws Exception {
- }
-
- private MessageWrapper getEmptyMessageWrapper(){
- return new MessageWrapper(new Message(), null);
- }
-}
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Mock implementation of routing table
+ */
+public class MockRoutingTable<K, V> implements RoutingTable {
+
+
+ @Override
+ public void addRoute(Object o, Object o2) throws RoutingTableException, SystemException {
+
+ }
+
+ @Override
+ public void addGlobalRoute(Object o, Object o2) throws RoutingTableException, SystemException {
+
+ }
+
+ @Override
+ public void removeRoute(Object o, Object o2) {
+
+ }
+
+ @Override
+ public void removeGlobalRoute(Object o) throws RoutingTableException, SystemException {
+
+ }
+
+ @Override
+ public Set getRoutes(Object o) {
+ Set<String> routes = new HashSet<String>();
+ routes.add("localhost:5554");
+ return routes;
+ }
+
+ @Override
+ public Set<Map.Entry> getAllRoutes() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Object getARoute(Object o) {
+ return null;
+ }
+
+ @Override
+ public void registerRouteChangeListener(RouteChangeListener routeChangeListener) {
+
+ }
+}
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
package org.opendaylight.controller.sal.connector.remoterpc;
import java.net.URI;
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import junit.framework.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper;
-import org.powermock.api.mockito.PowerMockito;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.zeromq.ZMQ;
-
-import java.util.concurrent.TimeoutException;
-
-import static org.mockito.Mockito.doNothing;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(RpcSocket.class)
-public class RpcSocketTest {
- RpcSocket rpcSocket = new RpcSocket("tcp://localhost:5554", new ZMQ.Poller(1));
- RpcSocket spy = PowerMockito.spy(rpcSocket);
-
- @Test
- public void testCreateSocket() throws Exception {
- Assert.assertEquals("tcp://localhost:5554", spy.getAddress());
- Assert.assertEquals(ZMQ.REQ, spy.getSocket().getType());
- }
-
- @Test(expected = TimeoutException.class)
- public void send_WhenQueueGetsFull_ShouldThrow() throws Exception {
-
- doNothing().when(spy).process();
-
- //10 is queue size
- for (int i=0;i<10;i++){
- spy.send(getEmptyMessageWrapper());
- }
-
- //sending 11th message should throw
- spy.send(getEmptyMessageWrapper());
- }
-
- @Test
- public void testHasTimedOut() throws Exception {
- spy.send(getEmptyMessageWrapper());
- Assert.assertFalse(spy.hasTimedOut());
- Thread.sleep(1000);
- Assert.assertFalse(spy.hasTimedOut());
- Thread.sleep(1000);
- Assert.assertTrue(spy.hasTimedOut());
- }
-
- @Test
- public void testProcess() throws Exception {
- PowerMockito.doNothing().when(spy, "sendMessage");
- spy.send(getEmptyMessageWrapper());
-
- //Next message should get queued
- spy.send(getEmptyMessageWrapper());
-
- //queue size should be 2
- Assert.assertEquals(2, spy.getQueueSize());
-
-
- spy.process();
- //sleep for 2 secs (timeout)
- //message send would be retried
- Thread.sleep(2000);
- spy.process();
- Thread.sleep(2000);
- spy.process();
- Thread.sleep(2000);
- spy.process(); //retry fails, next message will get picked up
- Assert.assertEquals(1, spy.getQueueSize());
- }
-
- @Test
- public void testProcessStateTransitions() throws Exception {
- PowerMockito.doNothing().when(spy, "sendMessage");
- Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
- spy.send(getEmptyMessageWrapper());
- Assert.assertEquals(1, spy.getQueueSize());
- Thread.sleep(200);
- Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
- Thread.sleep(1800);
-
- //1st timeout, 2nd try
- spy.process();
- Thread.sleep(200);
- Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
- Thread.sleep(1800);
-
- //2nd timeout, 3rd try
- spy.process();
- Thread.sleep(200);
- Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
- Thread.sleep(1800);
-
- //3rd timeout, no more tries => remove
- spy.process();
- Thread.sleep(200);
- Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
- Assert.assertEquals(0, spy.getQueueSize());
- }
-
- @Test
- public void testParseMessage() throws Exception {
- // Write an integration test for parseMessage
- }
-
- @Test
- public void testRecycleSocket() throws Exception {
- // This will need to be updated in the future...for now, recycleSocket() calls close()
- Assert.assertTrue(spy.getSocket().base().check_tag());
- spy.close();
- Assert.assertEquals(10, spy.getSocket().getLinger());
- Assert.assertFalse(spy.getSocket().base().check_tag());
- }
-
- @Test
- public void testClose() throws Exception {
- Assert.assertTrue(spy.getSocket().base().check_tag());
- spy.close();
- Assert.assertEquals(10, spy.getSocket().getLinger());
- Assert.assertFalse(spy.getSocket().base().check_tag());
- }
-
- @Test
- public void testReceive() throws Exception {
- PowerMockito.doReturn(null).when(spy, "parseMessage");
- PowerMockito.doNothing().when(spy, "process");
- spy.send(getEmptyMessageWrapper());
-
- //There should be 1 message waiting in the queue
- Assert.assertEquals(1, spy.getQueueSize());
-
- spy.receive();
- //This should complete message processing
- //The message should be removed from the queue
- Assert.assertEquals(0, spy.getQueueSize());
- Assert.assertEquals(RpcSocket.NUM_RETRIES, spy.getRetriesLeft());
-
- }
-
- @Test
- public void testReceiveStateTransitions() throws Exception {
- PowerMockito.doReturn(null).when(spy, "parseMessage");
- Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
- spy.send(getEmptyMessageWrapper());
-
- //There should be 1 message waiting in the queue
- Assert.assertEquals(1, spy.getQueueSize());
- Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
-
- spy.receive();
- //This should complete message processing
- //The message should be removed from the queue
- Assert.assertEquals(0, spy.getQueueSize());
- Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
- }
-
- private MessageWrapper getEmptyMessageWrapper(){
- return new MessageWrapper(new Message(), null);
- }
-
- @Test
- public void testProcessReceiveSequence() throws Exception {
- PowerMockito.doNothing().when(spy, "sendMessage");
- PowerMockito.doReturn(null).when(spy, "parseMessage");
- Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
- spy.send(getEmptyMessageWrapper());
- spy.send(getEmptyMessageWrapper());
- Assert.assertEquals(2, spy.getQueueSize());
- Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
-
-
- Thread.sleep(2000);
- Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
- spy.receive();
- Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
- Assert.assertEquals(1, spy.getQueueSize());
-
- spy.process();
- Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState);
- spy.receive();
- Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState);
- Assert.assertEquals(0, spy.getQueueSize());
- }
-}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+
+import com.google.common.base.Optional;
+import junit.framework.Assert;
+import org.junit.*;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.zeromq.ZMQ;
+import zmq.Ctx;
+import zmq.SocketBase;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ServerImplTest {
+
+ private static ZMQ.Context context;
+ private ServerImpl server;
+ private Broker.ProviderSession brokerSession;
+ private RoutingTableProvider routingTableProvider;
+ private RpcRegistrationListener listener;
+
+ ExecutorService pool;
+
+ //Server configuration
+ private final int HANDLER_COUNT = 2;
+ private final int HWM = 200;
+ private final int port = 5554;
+ //server address
+ private final String SERVER_ADDRESS = "tcp://localhost:5554";
+
+ //@BeforeClass
+ public static void init() {
+ context = ZMQ.context(1);
+ }
+
+ //@AfterClass
+ public static void destroy() {
+ MessagingUtil.closeZmqContext(context);
+ }
+
+ @Before
+ public void setup() throws InterruptedException {
+ context = ZMQ.context(1);
+ brokerSession = mock(Broker.ProviderSession.class);
+ routingTableProvider = mock(RoutingTableProvider.class);
+ listener = mock(RpcRegistrationListener.class);
+
+ server = new ServerImpl(port);
+ server.setBrokerSession(brokerSession);
+ server.setRoutingTableProvider(routingTableProvider);
+
+ RoutingTable<String, String> mockRoutingTable = new MockRoutingTable<String, String>();
+ Optional<RoutingTable<String, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
+ when(routingTableProvider.getRoutingTable()).thenReturn(optionalRoutingTable);
+
+ when(brokerSession.addRpcRegistrationListener(listener)).thenReturn(null);
+ when(brokerSession.getSupportedRpcs()).thenReturn(Collections.EMPTY_SET);
+ when(brokerSession.rpc(null, mock(CompositeNode.class))).thenReturn(null);
+ server.start();
+ Thread.sleep(5000);//wait for server to start
+ }
+
+ @After
+ public void tearDown() throws InterruptedException {
+
+ if (pool != null)
+ pool.shutdown();
+
+ if (server != null)
+ server.stop();
+
+ MessagingUtil.closeZmqContext(context);
+
+ Thread.sleep(5000);//wait for server to stop
+ Assert.assertEquals(ServerImpl.State.STOPPED, server.getStatus());
+ }
+
+ @Test
+ public void getRoutingTableProvider_Call_ShouldReturnRoutingTable() throws Exception {
+ Assert.assertNotNull(server.getRoutingTableProvider());
+ }
+
+ @Test
+ public void getBrokerSession_Call_ShouldReturnBrokerSession() throws Exception {
+ Optional<Broker.ProviderSession> mayBeBroker = server.getBrokerSession();
+
+ if (mayBeBroker.isPresent())
+ Assert.assertEquals(brokerSession, mayBeBroker.get());
+ else
+ Assert.fail("Broker does not exist in Remote RPC Server");
+
+ }
+
+ @Test
+ public void start_Call_ShouldSetServerStatusToStarted() throws Exception {
+ Assert.assertEquals(ServerImpl.State.STARTED, server.getStatus());
+
+ }
+
+ @Test
+ public void start_Call_ShouldCreateNZmqSockets() throws Exception {
+ final int EXPECTED_COUNT = 2 + HANDLER_COUNT; //1 ROUTER + 1 DEALER + HANDLER_COUNT
+
+ Optional<ZMQ.Context> mayBeContext = server.getZmqContext();
+ if (mayBeContext.isPresent())
+ Assert.assertEquals(EXPECTED_COUNT, findSocketCount(mayBeContext.get()));
+ else
+ Assert.fail("ZMQ Context does not exist in Remote RPC Server");
+ }
+
+ @Test
+ public void start_Call_ShouldCreate1ServerThread() {
+ final String SERVER_THREAD_NAME = "remote-rpc-server";
+ final int EXPECTED_COUNT = 1;
+ List<Thread> serverThreads = findThreadsWithName(SERVER_THREAD_NAME);
+ Assert.assertEquals(EXPECTED_COUNT, serverThreads.size());
+ }
+
+ @Test
+ public void start_Call_ShouldCreateNHandlerThreads() {
+ //final String WORKER_THREAD_NAME = "remote-rpc-worker";
+ final int EXPECTED_COUNT = HANDLER_COUNT;
+
+ Optional<ServerRequestHandler> serverRequestHandlerOptional = server.getHandler();
+ if (serverRequestHandlerOptional.isPresent()){
+ ServerRequestHandler handler = serverRequestHandlerOptional.get();
+ ThreadPoolExecutor workerPool = handler.getWorkerPool();
+ Assert.assertEquals(EXPECTED_COUNT, workerPool.getPoolSize());
+ } else {
+ Assert.fail("Server is in illegal state. ServerHandler does not exist");
+ }
+
+ }
+
+ @Test
+ public void testStop() throws Exception {
+
+ }
+
+ @Test
+ public void testOnRouteUpdated() throws Exception {
+
+ }
+
+ @Test
+ public void testOnRouteDeleted() throws Exception {
+
+ }
+
+ private int findSocketCount(ZMQ.Context context)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field ctxField = context.getClass().getDeclaredField("ctx");
+ ctxField.setAccessible(true);
+ Ctx ctx = Ctx.class.cast(ctxField.get(context));
+
+ Field socketListField = ctx.getClass().getDeclaredField("sockets");
+ socketListField.setAccessible(true);
+ List<SocketBase> sockets = List.class.cast(socketListField.get(ctx));
+
+ return sockets.size();
+ }
+
+ private List<Thread> findThreadsWithName(String name) {
+ Thread[] threads = new Thread[Thread.activeCount()];
+ Thread.enumerate(threads);
+
+ List<Thread> foundThreads = new ArrayList();
+ for (Thread t : threads) {
+ if (t.getName().startsWith(name))
+ foundThreads.add(t);
+ }
+
+ return foundThreads;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import junit.framework.Assert;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.zeromq.ZMQ;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.mockito.Mockito.mock;
+
+public class ServerRequestHandlerTest {
+
+ ServerRequestHandler handler;
+ ZMQ.Context context;
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ private int workerCount = 2;
+ private String mockDealerAddress = "inproc://rpc-request-handler";
+ private String mockServerIp = "localhost";
+ private int mockServerPort = 5554;
+
+ @Before
+ public void setUp() throws Exception {
+ context = ZMQ.context(1);
+ String mockServerAddress = mockServerIp + ":" + mockServerPort;
+ Broker.ProviderSession mockSession = mock(Broker.ProviderSession.class);
+ handler = new ServerRequestHandler(context, mockSession, workerCount, mockDealerAddress, mockServerAddress);
+ handler.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executorService.shutdown();
+ MessagingUtil.closeZmqContext(context);
+ handler.close();
+ }
+
+ @Test
+ public void testStart() throws Exception {
+ //should start workers == workerCount
+ Assert.assertEquals(workerCount, handler.getWorkerPool().getPoolSize());
+
+ //killing a thread should recreate another one
+
+ //start router-dealer bridge
+ executorService.execute(MessagingUtil.createRouterDealerBridge(context, mockDealerAddress, mockServerPort));
+ Thread.sleep(1000); //give sometime for socket initialization
+
+ //this will kill the thread
+ final String WORKER_THREAD_NAME = "remote-rpc-worker";
+ interruptAThreadWithName(WORKER_THREAD_NAME);
+
+ //send 4 message to router
+ for (int i = 0; i < 4; i++)
+ executorService.execute(MessagingUtil.sendAnEmptyMessage(context, "tcp://" + mockServerIp + ":" + mockServerPort));
+
+ //worker pool size should not change.
+ Assert.assertEquals(workerCount, handler.getWorkerPool().getPoolSize());
+
+ Thread.sleep(10000); //wait for processing to complete
+ }
+
+ @Test
+ public void testClose() throws Exception {
+
+ }
+
+ /**
+ * Interrupts the first thread found whose name starts with the provided name
+ *
+ * @param name
+ */
+ private void interruptAThreadWithName(String name) {
+ List<Thread> workerThreads = findThreadsWithName(name);
+ if (workerThreads.size() > 0) workerThreads.get(0).interrupt();
+ }
+
+ /**
+ * Find all threads that start with the given name
+ *
+ * @param name
+ * @return
+ */
+ private List<Thread> findThreadsWithName(String name) {
+ Thread[] threads = new Thread[Thread.activeCount()];
+ Thread.enumerate(threads);
+
+ List<Thread> foundThreads = new ArrayList();
+ for (Thread t : threads) {
+ if (t.getName().startsWith(name))
+ foundThreads.add(t);
+ }
+
+ return foundThreads;
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import com.google.common.base.Optional;
-import junit.framework.Assert;
-import org.junit.After;
-import org.junit.Before;
-import org.zeromq.ZMQ;
-import org.opendaylight.controller.sal.connector.remoterpc.SocketManager;
-import org.opendaylight.controller.sal.connector.remoterpc.RpcSocket;
-import org.opendaylight.controller.sal.connector.remoterpc.Context;
-import org.junit.Test;
-
-public class SocketManagerTest {
-
- SocketManager manager;
-
- @Before
- public void setup(){
- manager = new SocketManager();
- }
-
- @After
- public void tearDown() throws Exception {
- manager.close();
- }
-
- @Test
- public void getManagedSockets_When2NewAdded_ShouldContain2() throws Exception {
-
- //Prepare data
- manager.getManagedSocket("tcp://localhost:5554");
- manager.getManagedSocket("tcp://localhost:5555");
-
- Assert.assertTrue( 2 == manager.getManagedSockets().size());
- }
-
- @Test
- public void getManagedSockets_When2NewAddedAnd1Existing_ShouldContain2() throws Exception {
-
- //Prepare data
- manager.getManagedSocket("tcp://localhost:5554");
- manager.getManagedSocket("tcp://localhost:5555");
- manager.getManagedSocket("tcp://localhost:5554"); //ask for the first one
-
- Assert.assertTrue( 2 == manager.getManagedSockets().size());
- }
-
- @Test
- public void getManagedSocket_WhenPassedAValidAddress_ShouldReturnARpcSocket() throws Exception {
- String testAddress = "tcp://localhost:5554";
- RpcSocket rpcSocket = manager.getManagedSocket(testAddress);
- Assert.assertEquals(testAddress, rpcSocket.getAddress());
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void getManagedSocket_WhenPassedInvalidHostAddress_ShouldThrow() throws Exception {
- String testAddress = "tcp://nonexistenthost:5554";
- RpcSocket rpcSocket = manager.getManagedSocket(testAddress);
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void getManagedSocket_WhenPassedInvalidAddress_ShouldThrow() throws Exception {
- String testAddress = "xxx";
- RpcSocket rpcSocket = manager.getManagedSocket(testAddress);
- }
-
- @Test
- public void getManagedSocket_WhenPassedAValidZmqSocket_ShouldReturnARpcSocket() throws Exception {
- //Prepare data
- String firstAddress = "tcp://localhost:5554";
- RpcSocket firstRpcSocket = manager.getManagedSocket(firstAddress);
- ZMQ.Socket firstZmqSocket = firstRpcSocket.getSocket();
-
- String secondAddress = "tcp://localhost:5555";
- RpcSocket secondRpcSocket = manager.getManagedSocket(secondAddress);
- ZMQ.Socket secondZmqSocket = secondRpcSocket.getSocket();
-
- Assert.assertEquals(firstRpcSocket, manager.getManagedSocketFor(firstZmqSocket).get());
- Assert.assertEquals(secondRpcSocket, manager.getManagedSocketFor(secondZmqSocket).get());
- }
-
- @Test
- public void getManagedSocket_WhenPassedNonManagedZmqSocket_ShouldReturnNone() throws Exception {
- ZMQ.Socket nonManagedSocket = Context.getInstance().getZmqContext().socket(ZMQ.REQ);
- nonManagedSocket.connect("tcp://localhost:5000");
-
- //Prepare data
- String firstAddress = "tcp://localhost:5554";
- RpcSocket firstRpcSocket = manager.getManagedSocket(firstAddress);
- ZMQ.Socket firstZmqSocket = firstRpcSocket.getSocket();
-
- Assert.assertSame(Optional.<RpcSocket>absent(), manager.getManagedSocketFor(nonManagedSocket) );
- Assert.assertSame(Optional.<RpcSocket>absent(), manager.getManagedSocketFor(null) );
- }
-
- @Test
- public void stop_WhenCalled_ShouldEmptyManagedSockets() throws Exception {
- manager.getManagedSocket("tcp://localhost:5554");
- manager.getManagedSocket("tcp://localhost:5555");
- Assert.assertTrue( 2 == manager.getManagedSockets().size());
-
- manager.close();
- Assert.assertTrue( 0 == manager.getManagedSockets().size());
- }
-
- @Test
- public void poller_WhenCalled_ShouldReturnAnInstanceOfPoller() throws Exception {
- Assert.assertTrue (manager.getPoller() instanceof ZMQ.Poller);
- }
-
-}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.sal.connector.remoterpc.utils;
+
+import junit.framework.Assert;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+public class MessagingUtil {
+
+ private static final Logger _logger = LoggerFactory.getLogger(MessagingUtil.class);
+
+ public static Runnable startReplyServer(final ZMQ.Context context,
+ final String serverAddress,
+ final int numRequests /*number of requests after which server shuts down*/) {
+ return new Runnable() {
+
+ @Override
+ public void run() {
+ final ZMQ.Socket socket = context.socket(ZMQ.REP);
+ try {
+ int returnCode = socket.bind("tcp://" + serverAddress);
+ Assert.assertNotSame(-1, returnCode);
+ _logger.info(" Starting reply server[{}] for test...", serverAddress);
+
+ //for (int i=0;i<numRequests;i++) {
+ while (!Thread.currentThread().isInterrupted()) {
+ byte[] bytes = socket.recv();
+ _logger.debug(" Got request ");
+ socket.send(bytes);
+ _logger.debug(" Sent response ");
+ }
+ } catch (Exception x) {
+ StringWriter w = new StringWriter();
+ PrintWriter p = new PrintWriter(w);
+ x.printStackTrace(p);
+ _logger.debug(w.toString());
+ } finally {
+ socket.close();
+ _logger.info("Shutting down reply server");
+ }
+ }
+ };
+ }
+
+ public static Runnable createRouterDealerBridge(final ZMQ.Context context, final String dealerAddress, final int routerPort) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ ZMQ.Socket router = null;
+ ZMQ.Socket dealer = null;
+ try {
+ router = context.socket(ZMQ.ROUTER);
+ dealer = context.socket(ZMQ.DEALER);
+ router.bind("tcp://*:" + routerPort);
+ dealer.bind(dealerAddress);
+ ZMQ.proxy(router, dealer, null);
+ } catch (Exception e) {/*Ignore*/} finally {
+ if (router != null) router.close();
+ if (dealer != null) dealer.close();
+ }
+ }
+ };
+ }
+
+ public static Runnable sendAnEmptyMessage(final ZMQ.Context context, final String serverAddress)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ return new Runnable() {
+ @Override
+ public void run() {
+ final ZMQ.Socket socket = context.socket(ZMQ.REQ);
+ try {
+
+ socket.connect(serverAddress);
+ System.out.println(Thread.currentThread().getName() + " Sending message");
+ try {
+ socket.send(Message.serialize(new Message()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ byte[] bytes = socket.recv();
+ Message response = null;
+ try {
+ response = (Message) Message.deserialize(bytes);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ System.out.println(Thread.currentThread().getName() + " Got response " + response);
+ } catch (Exception x) {
+ x.printStackTrace();
+ } finally {
+ socket.close();
+ }
+ }
+ };
+ }
+
+ public static Message createEmptyMessage() {
+ return new Message();
+ }
+
+ /**
+ * Closes ZMQ Context. It tries to gracefully terminate the context. If
+ * termination takes more than a second, its forcefully shutdown.
+ */
+ public static void closeZmqContext(final ZMQ.Context context) {
+ if (context == null) return;
+
+ ExecutorService exec = Executors.newSingleThreadExecutor();
+ FutureTask zmqTermination = new FutureTask(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ if (context != null)
+ context.term();
+ _logger.debug("ZMQ Context terminated gracefully!");
+ } catch (Exception e) {/*Ignore and continue shutdown*/}
+ }
+ }, null);
+
+ exec.execute(zmqTermination);
+
+ try {
+ zmqTermination.get(1L, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ _logger.debug("ZMQ Context terminated forcefully!");
+ }
+
+ exec.shutdownNow();
+ }
+}
--- /dev/null
+<configuration scan="true">
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <root level="DEBUG">
+ <appender-ref ref="STDOUT" />
+ </root>
+</configuration>
import org.opendaylight.controller.sal.common.util.RpcErrors;
import org.opendaylight.controller.sal.common.util.Rpcs;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.CompositeNodeImpl;
import org.opendaylight.controller.sal.core.api.AbstractProvider;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.opendaylight.controller.sal.connector.remoterpc.Client;
+
import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcClient;
-import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcServer;
-import org.opendaylight.controller.sal.connector.remoterpc.ServerImpl;
+
import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sample.zeromq.provider.ExampleProvider;
import org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer;
+import org.opendaylight.controller.sample.zeromq.provider.ExampleProvider;
+
import org.opendaylight.controller.test.sal.binding.it.TestHelper;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleException;
import org.osgi.framework.ServiceReference;
-import org.osgi.framework.ServiceRegistration;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
import static org.opendaylight.controller.test.sal.binding.it.TestHelper.baseModelBundles;
import static org.opendaylight.controller.test.sal.binding.it.TestHelper.bindingAwareSalBundles;
-import static org.opendaylight.controller.test.sal.binding.it.TestHelper.configMinumumBundles;
-import static org.opendaylight.controller.test.sal.binding.it.TestHelper.mdSalCoreBundles;
+
import static org.ops4j.pax.exam.CoreOptions.*;
@RunWith(PaxExam.class)
_logger.debug("Provider sends announcement [{}]", "heartbeat");
provider.announce(QNAME);
- ServiceReference routerRef = ctx.getServiceReference(Client.class);
- Client router = (Client) ctx.getService(routerRef);
+ ServiceReference routerRef = ctx.getServiceReference(RemoteRpcClient.class);
+ RemoteRpcClient router = (RemoteRpcClient) ctx.getService(routerRef);
_logger.debug("Found router[{}]", router);
_logger.debug("Invoking RPC [{}]", QNAME);
for (int i = 0; i < 3; i++) {
}
}
}
+
private String stateToString(int state) {
switch (state) {
case Bundle.ACTIVE:
@Configuration
public Option[] config() {
return options(systemProperty("osgi.console").value("2401"),
- systemProperty("rpc.port").value("5555"),
- mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
- mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
- mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
- mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
-
- //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
- mavenBundle(ODL, "sal-common").versionAsInProject(), //
- mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
- mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
- mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
- mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
- mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
- mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
- mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
-
-
-
- baseModelBundles(),
- bindingAwareSalBundles(),
- TestHelper.bindingIndependentSalBundles(),
- TestHelper.configMinumumBundles(),
- TestHelper.mdSalCoreBundles(),
-
- //Added the consumer
- mavenBundle(SAMPLE, "sal-remoterpc-connector-test-consumer").versionAsInProject(), //
- //**** These two bundles below are NOT successfully resolved -- some of their dependencies must be missing
- //**** This causes the "Message" error to occur, the class cannot be found
- mavenBundle(SAMPLE, "sal-remoterpc-connector-test-provider").versionAsInProject(), //
- mavenBundle(ODL, "sal-remoterpc-connector").versionAsInProject(), //
-
- mavenBundle(ODL, "zeromq-routingtable.implementation").versionAsInProject(),
- mavenBundle(YANG, "concepts").versionAsInProject(),
- mavenBundle(YANG, "yang-binding").versionAsInProject(), //
- mavenBundle(YANG, "yang-common").versionAsInProject(), //
- mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
- mavenBundle(YANG, "yang-data-impl").versionAsInProject(), //
- mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
- mavenBundle(YANG, "yang-parser-api").versionAsInProject(), //
- mavenBundle(YANG, "yang-parser-impl").versionAsInProject(), //
- mavenBundle(YANG, "yang-model-util").versionAsInProject(), //
- mavenBundle(YANG + ".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
- mavenBundle(YANG + ".thirdparty", "antlr4-runtime-osgi-nohead").versionAsInProject(), //
- mavenBundle("com.google.guava", "guava").versionAsInProject(), //
- mavenBundle("org.zeromq", "jeromq").versionAsInProject(),
- mavenBundle("com.fasterxml.jackson.core", "jackson-annotations").versionAsInProject(),
- mavenBundle("com.fasterxml.jackson.core", "jackson-core").versionAsInProject(),
- mavenBundle("com.fasterxml.jackson.core", "jackson-databind").versionAsInProject(),
- //routingtable dependencies
- systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),
- // List framework bundles
- mavenBundle("equinoxSDK381", "org.eclipse.equinox.console").versionAsInProject(),
- mavenBundle("equinoxSDK381", "org.eclipse.equinox.util").versionAsInProject(),
- mavenBundle("equinoxSDK381", "org.eclipse.osgi.services").versionAsInProject(),
- mavenBundle("equinoxSDK381", "org.eclipse.equinox.ds").versionAsInProject(),
- mavenBundle("equinoxSDK381", "org.apache.felix.gogo.command").versionAsInProject(),
- mavenBundle("equinoxSDK381", "org.apache.felix.gogo.runtime").versionAsInProject(),
- mavenBundle("equinoxSDK381", "org.apache.felix.gogo.shell").versionAsInProject(),
- // List logger bundles
-
- mavenBundle("org.opendaylight.controller", "clustering.services")
- .versionAsInProject(),
- mavenBundle("org.opendaylight.controller", "clustering.stub")
- .versionAsInProject(),
-
-
- // List all the bundles on which the test case depends
- mavenBundle("org.opendaylight.controller", "sal")
- .versionAsInProject(),
- mavenBundle("org.opendaylight.controller", "sal.implementation")
- .versionAsInProject(),
- mavenBundle("org.jboss.spec.javax.transaction",
- "jboss-transaction-api_1.1_spec").versionAsInProject(),
- mavenBundle("org.apache.commons", "commons-lang3")
- .versionAsInProject(),
- mavenBundle("org.apache.felix",
- "org.apache.felix.dependencymanager")
- .versionAsInProject(),
-
- junitBundles()
+ systemProperty("rpc.port").value("5555"),
+ mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), //
+ mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), //
+ mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), //
+ mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), //
+
+ //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), //
+ mavenBundle(ODL, "sal-common").versionAsInProject(), //
+ mavenBundle(ODL, "sal-common-api").versionAsInProject(),//
+ mavenBundle(ODL, "sal-common-impl").versionAsInProject(), //
+ mavenBundle(ODL, "sal-common-util").versionAsInProject(), //
+ mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), //
+ mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), //
+ mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), //
+ mavenBundle(ODL, "sal-connector-api").versionAsInProject(), //
+
+
+ baseModelBundles(),
+ bindingAwareSalBundles(),
+ TestHelper.bindingIndependentSalBundles(),
+ TestHelper.configMinumumBundles(),
+ TestHelper.mdSalCoreBundles(),
+
+ //Added the consumer
+ mavenBundle(SAMPLE, "sal-remoterpc-connector-test-consumer").versionAsInProject(), //
+ //**** These two bundles below are NOT successfully resolved -- some of their dependencies must be missing
+ //**** This causes the "Message" error to occur, the class cannot be found
+ mavenBundle(SAMPLE, "sal-remoterpc-connector-test-provider").versionAsInProject(), //
+ mavenBundle(ODL, "sal-remoterpc-connector").versionAsInProject(), //
+
+ mavenBundle(ODL, "zeromq-routingtable.implementation").versionAsInProject(),
+ mavenBundle(YANG, "concepts").versionAsInProject(),
+ mavenBundle(YANG, "yang-binding").versionAsInProject(), //
+ mavenBundle(YANG, "yang-common").versionAsInProject(), //
+ mavenBundle(YANG, "yang-data-api").versionAsInProject(), //
+ mavenBundle(YANG, "yang-data-impl").versionAsInProject(), //
+ mavenBundle(YANG, "yang-model-api").versionAsInProject(), //
+ mavenBundle(YANG, "yang-parser-api").versionAsInProject(), //
+ mavenBundle(YANG, "yang-parser-impl").versionAsInProject(), //
+ mavenBundle(YANG, "yang-model-util").versionAsInProject(), //
+ mavenBundle(YANG + ".thirdparty", "xtend-lib-osgi").versionAsInProject(), //
+ mavenBundle(YANG + ".thirdparty", "antlr4-runtime-osgi-nohead").versionAsInProject(), //
+ mavenBundle("com.google.guava", "guava").versionAsInProject(), //
+ mavenBundle("org.zeromq", "jeromq").versionAsInProject(),
+ mavenBundle("org.codehaus.jackson", "jackson-mapper-asl").versionAsInProject(),
+ mavenBundle("org.codehaus.jackson", "jackson-core-asl").versionAsInProject(),
+ //routingtable dependencies
+ systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"),
+ // List framework bundles
+ mavenBundle("equinoxSDK381", "org.eclipse.equinox.console").versionAsInProject(),
+ mavenBundle("equinoxSDK381", "org.eclipse.equinox.util").versionAsInProject(),
+ mavenBundle("equinoxSDK381", "org.eclipse.osgi.services").versionAsInProject(),
+ mavenBundle("equinoxSDK381", "org.eclipse.equinox.ds").versionAsInProject(),
+ mavenBundle("equinoxSDK381", "org.apache.felix.gogo.command").versionAsInProject(),
+ mavenBundle("equinoxSDK381", "org.apache.felix.gogo.runtime").versionAsInProject(),
+ mavenBundle("equinoxSDK381", "org.apache.felix.gogo.shell").versionAsInProject(),
+ // List logger bundles
+
+ mavenBundle("org.opendaylight.controller", "clustering.services")
+ .versionAsInProject(),
+ mavenBundle("org.opendaylight.controller", "clustering.stub")
+ .versionAsInProject(),
+
+
+ // List all the bundles on which the test case depends
+ mavenBundle("org.opendaylight.controller", "sal")
+ .versionAsInProject(),
+ mavenBundle("org.opendaylight.controller", "sal.implementation")
+ .versionAsInProject(),
+ mavenBundle("org.jboss.spec.javax.transaction",
+ "jboss-transaction-api_1.1_spec").versionAsInProject(),
+ mavenBundle("org.apache.commons", "commons-lang3")
+ .versionAsInProject(),
+ mavenBundle("org.apache.felix",
+ "org.apache.felix.dependencymanager")
+ .versionAsInProject(),
+
+ junitBundles()
);
}
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.CompositeNodeImpl;
import org.opendaylight.controller.sal.connector.remoterpc.impl.RoutingTableImpl;
import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
import org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer;
_logger.info("Invoking RPC");
ExampleConsumer consumer = getConsumer();
- RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, new CompositeNodeImpl());
+ RpcResult<CompositeNode> result = consumer.invokeRpc(QNAME, consumer.getValidCompositeNodeWithOneSimpleChild());
_logger.info("Result [{}]", result.isSuccessful());
return stringify(result);
_logger.debug("Could not get routing table impl reference");
return "Could not get routingtable referen ";
}
- RoutingTable routingTable = (RoutingTableImpl) ctx.getService(routingTableServiceReference);
+ RoutingTableImpl routingTable = (RoutingTableImpl) ctx.getService(routingTableServiceReference);
if (routingTable == null) {
_logger.info("Could not get routing table service");
return "Could not get routing table service";
_logger.error("error in adding routing identifier" + e.getMessage());
}
- Set<String> routes = routingTable.getRoutes(rii.toString());
+ String result = routingTable.dumpRoutingTableCache();
- StringBuilder stringBuilder = new StringBuilder();
- for (String route : routes) {
- stringBuilder.append(route);
- }
- _logger.info("Result [{}] routes added for route" + rii + stringBuilder.toString());
- return stringBuilder.toString();
+
+ _logger.info("Result [{}] routes added for route" + rii + result);
+
+ return result;
}
@GET
public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier getRoute() {
return InstanceIdentifier.of(instance);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ RoutingIdentifierImpl that = (RoutingIdentifierImpl) o;
+
+ if (!QNAME.equals(that.QNAME)) return false;
+ if (!instance.equals(that.instance)) return false;
+ if (!namespace.equals(that.namespace)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = namespace.hashCode();
+ result = 31 * result + QNAME.hashCode();
+ result = 31 * result + instance.hashCode();
+ return result;
+ }
}
}