From: Ed Warnicke Date: Wed, 22 Jan 2014 02:45:31 +0000 (+0000) Subject: Merge "Changed codec for Identityref in JSON transformation" X-Git-Tag: jenkins-controller-bulk-release-prepare-only-2-1~39 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=672b16286fcf0cdb9c7016e163b1654e12359b33;hp=d8d2798ea4ec48c2987f6ab712c1548d4fde0e3c Merge "Changed codec for Identityref in JSON transformation" --- diff --git a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml b/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml index 40bd9f9aea..63ac8577ac 100644 --- a/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml +++ b/opendaylight/distribution/opendaylight/src/main/resources/configuration/initial/01-md-sal.xml @@ -58,6 +58,15 @@ runtime-mapping-singleton + + prefix:remote-zeromq-rpc-server + remoter + 5666 59 + 60 + prefix:dom-broker-osgi-registry + dom-broker + + @@ -146,6 +155,7 @@ urn:opendaylight:yang:extension:yang-ext?module=yang-ext&revision=2013-07-09 urn:opendaylight:params:xml:ns:yang:controller:md:sal:common?module=opendaylight-md-sal-common&revision=2013-10-28 urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:cluster:store?module=odl-sal-dom-clustered-store-cfg&revision=2013-10-28 + urn:opendaylight:params:xml:ns:yang:controller:md:sal:remote:rpc?module=odl-sal-dom-rpc-remote-cfg&revision=2013-10-28 diff --git a/opendaylight/distribution/opendaylight/src/main/resources/run.sh b/opendaylight/distribution/opendaylight/src/main/resources/run.sh index 1f3e8e20f3..90e3b03ae4 100755 --- a/opendaylight/distribution/opendaylight/src/main/resources/run.sh +++ b/opendaylight/distribution/opendaylight/src/main/resources/run.sh @@ -190,7 +190,7 @@ if [ "${statusdaemon}" -eq 1 ]; then else echo "Controller with PID: ${daemonpid} -- Doesn't seem to exist" rm -f "${pidfile}" - exit 0 + exit 1 fi else echo "Doesn't seem any Controller daemon is currently running, at least no PID file has been found" diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/NodeMapping.xtend b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/NodeMapping.xtend index 4378e7dffe..a3be5dd970 100644 --- a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/NodeMapping.xtend +++ b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/NodeMapping.xtend @@ -234,21 +234,21 @@ public class NodeMapping { public static def toAdBandwidth(PortFeatures pf) { var Bandwidth bw = null - if (pf.is_10mbHd || pf.is_10mbFd) { + if (pf.isTenMbHd || pf.isTenMbFd) { bw = new Bandwidth(Bandwidth.BW10Mbps) - } else if (pf.is_100mbHd || pf.is_100mbFd) { + } else if (pf.isHundredMbHd || pf.isHundredMbFd) { bw = new Bandwidth(Bandwidth.BW100Mbps) - } else if (pf.is_1gbHd || pf.is_1gbFd) { + } else if (pf.isOneGbHd || pf.isOneGbFd) { bw = new Bandwidth(Bandwidth.BW1Gbps) - } else if (pf.is_1gbFd) { + } else if (pf.isOneGbFd) { bw = new Bandwidth(Bandwidth.BW10Gbps) - } else if (pf.is_10gbFd) { + } else if (pf.isTenGbFd) { bw = new Bandwidth(Bandwidth.BW10Gbps) - } else if (pf.is_40gbFd) { + } else if (pf.isFortyGbFd) { bw = new Bandwidth(Bandwidth.BW40Gbps) - } else if (pf.is_100gbFd) { + } else if (pf.isHundredGbFd) { bw = new Bandwidth(Bandwidth.BW100Gbps) - } else if (pf.is_1tbFd) { + } else if (pf.isOneTbFd) { bw = new Bandwidth(Bandwidth.BW1Tbps) } return bw; diff --git a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/topology/TopologyMapping.xtend b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/topology/TopologyMapping.xtend index cd4b818037..850c442b9b 100644 --- a/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/topology/TopologyMapping.xtend +++ b/opendaylight/md-sal/compatibility/sal-compatibility/src/main/java/org/opendaylight/controller/sal/compatibility/topology/TopologyMapping.xtend @@ -20,9 +20,10 @@ import static extension org.opendaylight.controller.sal.compatibility.NodeMappin import org.opendaylight.controller.md.sal.binding.util.TypeSafeDataReader import org.opendaylight.yangtools.yang.binding.InstanceIdentifier import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector +import org.slf4j.LoggerFactory class TopologyMapping { - + private static val LOG = LoggerFactory.getLogger(TopologyMapping); private new() { throw new UnsupportedOperationException("Utility class. Instantiation is not allowed."); } @@ -47,7 +48,20 @@ class TopologyMapping { } public static def toAdEdgeProperties(Edge e,TypeSafeDataReader reader) { - val nc = reader.readOperationalData(e.tailNodeConnector.toNodeConnectorRef.value as InstanceIdentifier) + val ncref = e.tailNodeConnector.toNodeConnectorRef + if(ncref == null) { + LOG.debug("Edge {} ncref {}",e,ncref) + return null; + } + val ncInstanceId = (ncref.value as InstanceIdentifier) + if(ncInstanceId == null) { + LOG.debug("Edge {} ncref {}",e,ncref) + return null; + } + val nc = reader.readOperationalData(ncInstanceId) + if(nc == null) { + return null; + } return nc.toADNodeConnectorProperties } diff --git a/opendaylight/md-sal/model/model-flow-base/src/main/yang/meter-types.yang b/opendaylight/md-sal/model/model-flow-base/src/main/yang/opendaylight-meter-types.yang similarity index 94% rename from opendaylight/md-sal/model/model-flow-base/src/main/yang/meter-types.yang rename to opendaylight/md-sal/model/model-flow-base/src/main/yang/opendaylight-meter-types.yang index d84b2f0851..1686cad3de 100644 --- a/opendaylight/md-sal/model/model-flow-base/src/main/yang/meter-types.yang +++ b/opendaylight/md-sal/model/model-flow-base/src/main/yang/opendaylight-meter-types.yang @@ -2,7 +2,6 @@ module opendaylight-meter-types { namespace "urn:opendaylight:meter:types"; prefix meter; - import ietf-inet-types {prefix inet; revision-date "2010-09-24";} import ietf-yang-types {prefix yang; revision-date "2010-09-24";} @@ -73,21 +72,21 @@ module opendaylight-meter-types { grouping band-type { choice band-type { case drop { - leaf rate { + leaf drop-rate { type uint32; } - leaf burst-size { + leaf drop-burst-size { type uint32; } } case dscp-remark { - leaf rate { + leaf dscp-remark-rate { type uint32; } - leaf burst-size { + leaf dscp-remark-burst-size { type uint32; } @@ -97,11 +96,11 @@ module opendaylight-meter-types { } case experimenter { - leaf rate { + leaf experimenter-rate { type uint32; } - leaf burst-size { + leaf experimenter-burst-size { type uint32; } @@ -151,11 +150,11 @@ module opendaylight-meter-types { } } - leaf rate { + leaf band-rate { type uint32; } - leaf burst-size { + leaf band-burst-size { type uint32; } uses band-type; @@ -262,4 +261,4 @@ module opendaylight-meter-types { uses meter-features; } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/model/model-flow-base/src/main/yang/port-types.yang b/opendaylight/md-sal/model/model-flow-base/src/main/yang/opendaylight-port-types.yang similarity index 92% rename from opendaylight/md-sal/model/model-flow-base/src/main/yang/port-types.yang rename to opendaylight/md-sal/model/model-flow-base/src/main/yang/opendaylight-port-types.yang index 2554fffadb..f1cbd8dc29 100644 --- a/opendaylight/md-sal/model/model-flow-base/src/main/yang/port-types.yang +++ b/opendaylight/md-sal/model/model-flow-base/src/main/yang/opendaylight-port-types.yang @@ -2,7 +2,6 @@ module opendaylight-port-types { namespace "urn:opendaylight:flow:types:port"; prefix port-types; - import ietf-inet-types {prefix inet; revision-date "2010-09-24";} import ietf-yang-types {prefix yang; revision-date "2010-09-24";} import opendaylight-queue-types {prefix queue-types; revision-date "2013-09-25";} @@ -41,16 +40,16 @@ module opendaylight-port-types { typedef port-features { type bits { - bit 10mb-hd; - bit 10mb-fd; - bit 100mb-hd; - bit 100mb-fd; - bit 1gb-hd; - bit 1gb-fd; - bit 10gb-fd; - bit 40gb-fd; - bit 100gb-fd; - bit 1tb-fd; + bit ten-mb-hd; + bit ten-mb-fd; + bit hundred-mb-hd; + bit hundred-mb-fd; + bit one-gb-hd; + bit one-gb-fd; + bit ten-gb-fd; + bit forty-gb-fd; + bit hundred-gb-fd; + bit one-tb-fd; bit other; bit copper; bit fiber; @@ -171,4 +170,4 @@ module opendaylight-port-types { } } } -} \ No newline at end of file +} diff --git a/opendaylight/md-sal/model/model-flow-base/src/main/yang/table-types.yang b/opendaylight/md-sal/model/model-flow-base/src/main/yang/opendaylight-table-types.yang similarity index 92% rename from opendaylight/md-sal/model/model-flow-base/src/main/yang/table-types.yang rename to opendaylight/md-sal/model/model-flow-base/src/main/yang/opendaylight-table-types.yang index 5e747e4722..118db1af06 100644 --- a/opendaylight/md-sal/model/model-flow-base/src/main/yang/table-types.yang +++ b/opendaylight/md-sal/model/model-flow-base/src/main/yang/opendaylight-table-types.yang @@ -2,8 +2,6 @@ module opendaylight-table-types { namespace "urn:opendaylight:table:types"; prefix table; - import ietf-inet-types {prefix inet; revision-date "2010-09-24";} - import ietf-yang-types {prefix yang; revision-date "2010-09-24";} import opendaylight-flow-types {prefix flow;revision-date "2013-10-26";} import opendaylight-action-types {prefix action;} @@ -227,7 +225,7 @@ module opendaylight-table-types { } case next-table-miss { - container tables { + container tables-miss { leaf-list table-ids { type uint8; } @@ -259,27 +257,39 @@ module opendaylight-table-types { } case match { - uses set-field-match; + container match-setfield { + uses set-field-match; + } } case wildcards { - uses set-field-match; + container wildcard-setfield { + uses set-field-match; + } } case write-setfield { - uses set-field-match; + container write-setfield { + uses set-field-match; + } } case write-setfield-miss { - uses set-field-match; + container write-setfield-miss { + uses set-field-match; + } } case apply-setfield { - uses set-field-match; + container apply-setfield { + uses set-field-match; + } } case apply-setfield-miss { - uses set-field-match; + container apply-setfield-miss { + uses set-field-match; + } } } } @@ -326,7 +336,6 @@ module opendaylight-table-types { uses table-feature-prop-type; } } - } } } diff --git a/opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RoutingTable.java b/opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RoutingTable.java index 6ec4c2ce01..2da031e540 100644 --- a/opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RoutingTable.java +++ b/opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RoutingTable.java @@ -7,6 +7,7 @@ */ package org.opendaylight.controller.sal.connector.remoterpc.api; +import java.util.Map; import java.util.Set; public interface RoutingTable { @@ -65,6 +66,12 @@ public interface RoutingTable { */ public Set getRoutes(I routeId); + /** + * Returns all network addresses stored in the table + * @return + */ + public Set getAllRoutes(); + /** * Returns only one address from the list of network addresses * associated with the route. The algorithm to determine that diff --git a/opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/impl/RoutingTableImpl.java b/opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/impl/RoutingTableImpl.java index 59292a174e..40c4c6b436 100644 --- a/opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/impl/RoutingTableImpl.java +++ b/opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/impl/RoutingTableImpl.java @@ -72,24 +72,14 @@ public class RoutingTableImpl implements RoutingTable, ICacheUpdateA } // lets start a transaction clusterGlobalServices.tbegin(); - Set routes = new HashSet(); - routes.add(route); - routingTableCache.put(routeId, routes); + + routingTableCache.put(routeId, route); clusterGlobalServices.tcommit(); } else { throw new DuplicateRouteException(" There is already existing route " + existingRoute); } - } catch (NotSupportedException e) { - throw new RoutingTableException("Transaction error - while trying to create route id=" - + routeId + "with route" + route, e); - } catch (HeuristicRollbackException e) { - throw new RoutingTableException("Transaction error - while trying to create route id=" - + routeId + "with route" + route, e); - } catch (RollbackException e) { - throw new RoutingTableException("Transaction error - while trying to create route id=" - + routeId + "with route" + route, e); - } catch (HeuristicMixedException e) { + } catch (NotSupportedException|HeuristicRollbackException|RollbackException|HeuristicMixedException e) { throw new RoutingTableException("Transaction error - while trying to create route id=" + routeId + "with route" + route, e); } catch (javax.transaction.SystemException e) { @@ -116,16 +106,7 @@ public class RoutingTableImpl implements RoutingTable, ICacheUpdateA routingTableCache.remove(routeId); clusterGlobalServices.tcommit(); - } catch (NotSupportedException e) { - throw new RoutingTableException("Transaction error - while trying to remove route id=" - + routeId, e); - } catch (HeuristicRollbackException e) { - throw new RoutingTableException("Transaction error - while trying to remove route id=" - + routeId, e); - } catch (RollbackException e) { - throw new RoutingTableException("Transaction error - while trying to remove route id=" - + routeId, e); - } catch (HeuristicMixedException e) { + } catch (NotSupportedException|HeuristicRollbackException|RollbackException|HeuristicMixedException e) { throw new RoutingTableException("Transaction error - while trying to remove route id=" + routeId, e); } catch (javax.transaction.SystemException e) { @@ -139,10 +120,22 @@ public class RoutingTableImpl implements RoutingTable, ICacheUpdateA // Note: currently works for global routes only wherein there is just single // route Preconditions.checkNotNull(routeId, "getARoute: routeId cannot be null!"); - return (Set) routingTableCache.get(routeId); + R route = (R)routingTableCache.get(routeId); + Setroutes = null; + if(route !=null){ + routes = new HashSet(); + routes.add(route); + } + + return routes; } - @Override + @Override + public Set getAllRoutes() { + return routingTableCache.entrySet(); + } + + @Override public R getARoute(I routeId) { throw new UnsupportedOperationException("Not implemented yet!"); } diff --git a/opendaylight/md-sal/remoterpc-routingtable/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/impl/RoutingTableImplTest.java b/opendaylight/md-sal/remoterpc-routingtable/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/impl/RoutingTableImplTest.java index 2ef251d9a1..50460d4e5e 100644 --- a/opendaylight/md-sal/remoterpc-routingtable/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/impl/RoutingTableImplTest.java +++ b/opendaylight/md-sal/remoterpc-routingtable/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/impl/RoutingTableImplTest.java @@ -21,6 +21,7 @@ import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import java.net.URI; import java.util.EnumSet; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -80,8 +81,7 @@ public class RoutingTableImplTest { rti.addGlobalRoute(routeIdentifier, "172.27.12.1:5000"); - Set globalService = new HashSet(); - globalService.add("172.27.12.1:5000"); + String globalService = "172.27.12.1:5000"; when(concurrentMap.get(routeIdentifier)).thenReturn(globalService); ConcurrentMap latestCache = rti.getRoutingTableCache(); @@ -92,9 +92,10 @@ public class RoutingTableImplTest { Assert.assertEquals(servicesGlobal.size(),1); - - Assert.assertEquals(servicesGlobal.iterator().next(),"172.27.12.1:5000"); - + Iterator iterator = servicesGlobal.iterator(); + while(iterator.hasNext()){ + Assert.assertEquals(iterator.next(),"172.27.12.1:5000"); + } } @@ -124,8 +125,7 @@ public class RoutingTableImplTest { rti.addGlobalRoute(routeIdentifier, "172.27.12.1:5000"); - Set globalService = new HashSet(); - globalService.add("172.27.12.1:5000"); + String globalService = "172.27.12.1:5000"; when(concurrentMap.get(routeIdentifier)).thenReturn(globalService); ConcurrentMap latestCache = rti.getRoutingTableCache(); diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcRoutingContext.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcRoutingContext.java new file mode 100644 index 0000000000..1680c19277 --- /dev/null +++ b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcRoutingContext.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.core.api; + +import org.opendaylight.yangtools.yang.common.QName; + +public interface RpcRoutingContext { + + public QName getContext(); + + public QName getRpcType(); +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/pom.xml b/opendaylight/md-sal/sal-remoterpc-connector/implementation/pom.xml index ee9bc8922b..58a0d3044a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/pom.xml +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/pom.xml @@ -80,7 +80,12 @@ org.slf4j slf4j-api - + + ch.qos.logback + logback-classic + 1.0.12 + test + com.fasterxml.jackson.core jackson-annotations @@ -145,8 +150,11 @@ org.opendaylight.controller.config.yang.md.sal.remote.rpc, - org.opendaylight.controller.sal.connector.remoterpc, - org.opendaylight.controller.sal.connector.remoterpc.*, + org.opendaylight.controller.sal.connector.remoterpc.util, + org.opendaylight.controller.sal.connector.remoterpc.dto, + org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcClient, + org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcServer, + org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcProvider ${project.groupId}.${project.artifactId} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java index 606f282bd7..1117986429 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java @@ -9,10 +9,7 @@ */ package org.opendaylight.controller.config.yang.md.sal.remote.rpc; -import org.opendaylight.controller.sal.connector.remoterpc.Client; -import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcProvider; -import org.opendaylight.controller.sal.connector.remoterpc.RoutingTableProvider; -import org.opendaylight.controller.sal.connector.remoterpc.ServerImpl; +import org.opendaylight.controller.sal.connector.remoterpc.*; import org.opendaylight.controller.sal.core.api.Broker; import org.osgi.framework.BundleContext; @@ -44,14 +41,18 @@ public final class ZeroMQServerModule extends org.opendaylight.controller.config public java.lang.AutoCloseable createInstance() { Broker broker = getDomBrokerDependency(); - RoutingTableProvider provider = new RoutingTableProvider(bundleContext); - + + final int port = getPort() != null ? getPort() : ZEROMQ_ROUTER_PORT; ServerImpl serverImpl = new ServerImpl(port); - Client clientImpl = new Client(); + ClientImpl clientImpl = new ClientImpl(); + + RoutingTableProvider provider = new RoutingTableProvider(bundleContext,serverImpl); + + RemoteRpcProvider facade = new RemoteRpcProvider(serverImpl, clientImpl); facade.setRoutingTableProvider(provider ); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/CapturedMessageHandler.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/CapturedMessageHandler.java new file mode 100644 index 0000000000..2dc5eee59a --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/CapturedMessageHandler.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; + +public class CapturedMessageHandler implements Runnable { + + private Logger _logger = LoggerFactory.getLogger(CapturedMessageHandler.class); + + private ZMQ.Socket socket; + + public CapturedMessageHandler(ZMQ.Socket socket){ + this.socket = socket; + } + + @Override + public void run(){ + + try { + while (!Thread.currentThread().isInterrupted()){ + String message = socket.recvStr(); + _logger.debug("Captured [{}]", message); + } + } catch (Exception e) { + _logger.error("Exception raised [{}]", e.getMessage()); + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Client.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Client.java deleted file mode 100644 index ef3162359c..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Client.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.sal.connector.remoterpc; - -import com.google.common.base.Optional; - -import org.opendaylight.controller.sal.common.util.RpcErrors; -import org.opendaylight.controller.sal.common.util.Rpcs; -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; -import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; -import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper; -import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; -import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; -import org.opendaylight.controller.sal.core.api.RpcImplementation; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.zeromq.ZMQ; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.*; - -import static com.google.common.base.Preconditions.*; - -/** - * An implementation of {@link RpcImplementation} that makes remote RPC calls - */ -public class Client implements RemoteRpcClient { - - private final Logger _logger = LoggerFactory.getLogger(Client.class); - - private final LinkedBlockingQueue requestQueue = new LinkedBlockingQueue(100); - - private final ExecutorService pool = Executors.newSingleThreadExecutor(); - private final long TIMEOUT = 5000; // in ms - - private RoutingTableProvider routingTableProvider; - - public RoutingTableProvider getRoutingTableProvider() { - return routingTableProvider; - } - - public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) { - this.routingTableProvider = routingTableProvider; - } - - public LinkedBlockingQueue getRequestQueue() { - return requestQueue; - } - - @Override - public Set getSupportedRpcs() { - // TODO: Find the entries from routing table - return Collections.emptySet(); - } - - public void start() { - pool.execute(new Sender(this)); - - } - - public void stop() { - - _logger.debug("Client stopping..."); - Context.getInstance().getZmqContext().term(); - _logger.debug("ZMQ context terminated"); - - pool.shutdown(); // intiate shutdown - try { - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - _logger.error("Client thread pool did not shut down"); - } - } catch (InterruptedException e) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - _logger.debug("Client stopped"); - } - - @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { - - RouteIdentifierImpl routeId = new RouteIdentifierImpl(); - routeId.setType(rpc); - - String address = lookupRemoteAddress(routeId); - - Message request = new Message.MessageBuilder().type(Message.MessageType.REQUEST) - .sender(Context.getInstance().getLocalUri()).recipient(address).route(routeId) - .payload(XmlUtils.compositeNodeToXml(input)).build(); - - List errors = new ArrayList(); - - try (SocketPair pair = new SocketPair()) { - - MessageWrapper messageWrapper = new MessageWrapper(request, pair.getSender()); - process(messageWrapper); - Message response = parseMessage(pair.getReceiver()); - - CompositeNode payload = XmlUtils.xmlToCompositeNode((String) response.getPayload()); - - return Rpcs.getRpcResult(true, payload, errors); - - } catch (Exception e) { - collectErrors(e, errors); - return Rpcs.getRpcResult(false, null, errors); - } - - } - - public void process(MessageWrapper msg) throws TimeoutException, InterruptedException { - _logger.debug("Processing message [{}]", msg); - - boolean success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS); - if (!success) - throw new TimeoutException("Queue is full"); - } - - /** - * Block on socket for reply - * - * @param receiver - * @return - */ - private Message parseMessage(ZMQ.Socket receiver) throws IOException, ClassNotFoundException { - return (Message) Message.deserialize(receiver.recv()); - } - - /** - * Find address for the given route identifier in routing table - * - * @param routeId - * route identifier - * @return remote network address - */ - private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId) { - checkNotNull(routeId, "route must not be null"); - - Optional> routingTable = routingTableProvider.getRoutingTable(); - checkNotNull(routingTable.isPresent(), "Routing table is null"); - - Set addresses = routingTable.get().getRoutes(routeId.toString()); - checkNotNull(addresses, "Address not found for route [%s]", routeId); - checkState(addresses.size() == 1, "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); // its - // a - // global - // service. - - String address = addresses.iterator().next(); - checkNotNull(address, "Address not found for route [%s]", routeId); - - return address; - } - - private void collectErrors(Exception e, List errors) { - if (e == null) - return; - if (errors == null) - errors = new ArrayList(); - - errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause())); - for (Throwable t : e.getSuppressed()) { - errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t)); - } - } - - @Override - public void close() throws Exception { - stop(); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java new file mode 100644 index 0000000000..291fe0b8e7 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java @@ -0,0 +1,194 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + +import com.google.common.base.Optional; +import org.opendaylight.controller.sal.common.util.RpcErrors; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.sal.connector.api.RpcRouter; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; +import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * An implementation of {@link RpcImplementation} that makes + * remote RPC calls + */ +public class ClientImpl implements RemoteRpcClient { + + private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class); + + private ZMQ.Context context = ZMQ.context(1); + private ClientRequestHandler handler; + private RoutingTableProvider routingTableProvider; + + public ClientImpl(){ + handler = new ClientRequestHandler(context); + start(); + } + + public ClientImpl(ClientRequestHandler handler){ + this.handler = handler; + start(); + } + + public RoutingTableProvider getRoutingTableProvider() { + return routingTableProvider; + } + + public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) { + this.routingTableProvider = routingTableProvider; + } + + @Override + public Set getSupportedRpcs(){ + //TODO: Find the entries from routing table + return Collections.emptySet(); + } + + @Override + public void start() {/*NOOPS*/} + + @Override + public void stop() { + closeZmqContext(); + handler.close(); + _logger.info("Stopped"); + } + + @Override + public void close(){ + stop(); + } + + /** + * Finds remote server that can execute this rpc and sends a message to it + * requesting execution. + * The call blocks until a response from remote server is received. Its upto + * the client of this API to implement a timeout functionality. + * + * @param rpc remote service to be executed + * @param input payload for the remote service + * @return + */ + @Override + public RpcResult invokeRpc(QName rpc, CompositeNode input) { + + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(rpc); + + String address = lookupRemoteAddress(routeId); + + Message request = new Message.MessageBuilder() + .type(Message.MessageType.REQUEST) + .sender(Context.getInstance().getLocalUri()) + .recipient(address) + .route(routeId) + .payload(XmlUtils.compositeNodeToXml(input)) + .build(); + + List errors = new ArrayList(); + + try{ + Message response = handler.handle(request); + CompositeNode payload = null; + + if ( response != null ) + payload = XmlUtils.xmlToCompositeNode((String) response.getPayload()); + + return Rpcs.getRpcResult(true, payload, errors); + + } catch (Exception e){ + collectErrors(e, errors); + return Rpcs.getRpcResult(false, null, errors); + } + + } + + /** + * Find address for the given route identifier in routing table + * @param routeId route identifier + * @return remote network address + */ + private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId){ + checkNotNull(routeId, "route must not be null"); + + Optional> routingTable = routingTableProvider.getRoutingTable(); + checkNotNull(routingTable.isPresent(), "Routing table is null"); + + Set addresses = routingTable.get().getRoutes(routeId.toString()); + checkNotNull(addresses, "Address not found for route [%s]", routeId); + checkState(addresses.size() == 1, + "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); //its a global service. + + String address = addresses.iterator().next(); + checkNotNull(address, "Address not found for route [%s]", routeId); + + return address; + } + + private void collectErrors(Exception e, List errors){ + if (e == null) return; + if (errors == null) errors = new ArrayList(); + + errors.add(RpcErrors.getRpcError(null, null, null, null, e.getMessage(), null, e.getCause())); + for (Throwable t : e.getSuppressed()) { + errors.add(RpcErrors.getRpcError(null, null, null, null, t.getMessage(), null, t)); + } + } + + /** + * Closes ZMQ Context. It tries to gracefully terminate the context. If + * termination takes more than a second, its forcefully shutdown. + */ + private void closeZmqContext() { + ExecutorService exec = Executors.newSingleThreadExecutor(); + FutureTask zmqTermination = new FutureTask(new Runnable() { + + @Override + public void run() { + try { + if (context != null) + context.term(); + _logger.debug("ZMQ Context terminated"); + } catch (Exception e) { + _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e); + } + } + }, null); + + exec.execute(zmqTermination); + + try { + zmqTermination.get(1L, TimeUnit.SECONDS); + } catch (Exception e) {/*ignore and continue with shutdown*/} + + exec.shutdownNow(); + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandler.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandler.java new file mode 100644 index 0000000000..f3ef4b6cae --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandler.java @@ -0,0 +1,190 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + +import com.google.common.base.Preconditions; +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * + */ +class ClientRequestHandler implements AutoCloseable{ + + private Logger _logger = LoggerFactory.getLogger(ClientRequestHandler.class); + private final String DEFAULT_NAME = "remoterpc-client-worker"; + private final String INPROC_PROTOCOL_PREFIX = "inproc://"; + private final String TCP_PROTOCOL_PREFIX = "tcp://"; + + private ZMQ.Context context; + + /* + * Worker thread pool. Each thread runs a ROUTER-DEALER pair + */ + private ExecutorService workerPool; + + /* + * Set of remote servers this client is currently connected to + */ + private Map connectedServers; + + protected ClientRequestHandler(ZMQ.Context context) { + this.context = context; + connectedServers = new ConcurrentHashMap(); + start(); + } + + /** + * Starts a pool of worker as needed. A worker thread that has not been used for 5 min + * is terminated and removed from the pool. If thread dies due to an exception, its + * restarted. + */ + private void start(){ + + workerPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 5L, TimeUnit.MINUTES, + new SynchronousQueue()){ + + @Override + protected void afterExecute(Runnable r, Throwable t) { + if (isTerminating() || isTerminated() || isShutdown()) + return; + + Worker worker = (Worker) r; + Preconditions.checkState( worker != null ); + String remoteServerAddress = worker.getRemoteServerAddress(); + connectedServers.remove(remoteServerAddress); + + if ( t != null ){ + _logger.debug("Exception caught while terminating worker [{},{}]. " + + "Restarting worker...", t.getClass(), t.getMessage()); + + connectedServers.put(remoteServerAddress, remoteServerAddress); + this.execute(r); + } + super.afterExecute(r, null); + } + }; + } + + public Message handle(Message request) throws IOException, ClassNotFoundException, InterruptedException { + + String remoteServerAddress = request.getRecipient(); + //if we already have router-dealer bridge setup for this address the send request + //otherwise first create the bridge and then send request + if ( connectedServers.containsKey(remoteServerAddress) ) + return sendMessage(request, remoteServerAddress); + else{ + workerPool.execute(new Worker(remoteServerAddress)); + connectedServers.put(remoteServerAddress, remoteServerAddress); + //give little time for sockets to get initialized + //TODO: Add socket ping-pong message to ensure socket init rather than thread.sleep. + Thread.sleep(1000); + return sendMessage(request, remoteServerAddress); + } + } + + private Message sendMessage(Message request, String address) throws IOException, ClassNotFoundException { + Message response = null; + ZMQ.Socket socket = context.socket(ZMQ.REQ); + + try { + socket.connect( INPROC_PROTOCOL_PREFIX + address); + socket.send(Message.serialize(request)); + _logger.debug("Request sent. Waiting for reply..."); + byte[] reply = socket.recv(0); + _logger.debug("Response received"); + response = (Message) Message.deserialize(reply); + } finally { + socket.close(); + } + return response; + } + + /** + * This gets called automatically if used with try-with-resources + */ + @Override + public void close(){ + workerPool.shutdown(); + _logger.info("Request Handler closed"); + } + + /** + * Total number of workers in the pool. Number of workers represent + * number of remote servers {@link org.opendaylight.controller.sal.connector.remoterpc.ClientImpl} is connected to. + * + * @return worker count + */ + public int getWorkerCount(){ + + if (workerPool == null) return 0; + + return ((ThreadPoolExecutor)workerPool).getActiveCount(); + } + /** + * Handles RPC request + */ + private class Worker implements Runnable { + private String name; + private String remoteServer; // + + public Worker(String address){ + this.name = DEFAULT_NAME + "[" + address + "]"; + this.remoteServer = address; + } + + public String getRemoteServerAddress(){ + return this.remoteServer; + } + + @Override + public void run() { + Thread.currentThread().setName(name); + _logger.debug("Starting ... "); + + ZMQ.Socket router = context.socket(ZMQ.ROUTER); + ZMQ.Socket dealer = context.socket(ZMQ.DEALER); + + try { + int success = router.bind(INPROC_PROTOCOL_PREFIX + remoteServer); + Preconditions.checkState(-1 != success, "Could not bind to " + remoteServer); + + dealer.connect(TCP_PROTOCOL_PREFIX + remoteServer); + + _logger.info("Worker started for [{}]", remoteServer); + + //TODO: Add capture handler + //This code will block until the zmq context is terminated. + ZMQ.proxy(router, dealer, null); + + } catch (Exception e) { + _logger.debug("Ignoring exception [{}, {}]", e.getClass(), e.getMessage()); + } finally { + try { + router.close(); + dealer.close(); + } catch (Exception x) { + _logger.debug("Exception while closing socket [{}]", x); + } + _logger.debug("Closing..."); + } + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Context.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Context.java index f0bf12cbc0..9e66c20613 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Context.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Context.java @@ -21,6 +21,7 @@ import java.util.Enumeration; public class Context { private ZMQ.Context zmqContext = ZMQ.context(1); private String uri; + private final String DEFAULT_RPC_PORT = "5554"; private static Context _instance = new Context(); @@ -36,7 +37,7 @@ public class Context { public String getLocalUri(){ uri = (uri != null) ? uri - : new StringBuilder("tcp://").append(getIpAddress()).append(":") + : new StringBuilder().append(getIpAddress()).append(":") .append(getRpcPort()).toString(); return uri; @@ -45,7 +46,7 @@ public class Context { public String getRpcPort(){ String rpcPort = (System.getProperty("rpc.port") != null) ? System.getProperty("rpc.port") - : "5554"; + : DEFAULT_RPC_PORT; return rpcPort; } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcClient.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcClient.java index 6bd123b7e4..1f78a6771a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcClient.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcClient.java @@ -1,3 +1,11 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.sal.connector.remoterpc; import org.opendaylight.controller.sal.core.api.RpcImplementation; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java index 3c2e3b0872..bf205fc38d 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java @@ -1,25 +1,29 @@ -package org.opendaylight.controller.sal.connector.remoterpc; +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.ExecutorService; +package org.opendaylight.controller.sal.connector.remoterpc; -import org.opendaylight.controller.sal.connector.remoterpc.Client; -import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.opendaylight.controller.sal.core.api.Provider; -import org.opendaylight.controller.sal.core.api.Provider.ProviderFunctionality; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import java.util.Collection; +import java.util.Set; + public class RemoteRpcProvider implements RemoteRpcServer, RemoteRpcClient, Provider { private final ServerImpl server; - private final Client client; + private final ClientImpl client; private RoutingTableProvider provider; @Override @@ -40,7 +44,7 @@ public class RemoteRpcProvider implements } - public RemoteRpcProvider(ServerImpl server, Client client) { + public RemoteRpcProvider(ServerImpl server, ClientImpl client) { this.server = server; this.client = client; } @@ -48,22 +52,19 @@ public class RemoteRpcProvider implements public void setBrokerSession(ProviderSession session) { server.setBrokerSession(session); } - public void setServerPool(ExecutorService serverPool) { - server.setServerPool(serverPool); - } +// public void setServerPool(ExecutorService serverPool) { +// server.setServerPool(serverPool); +// } public void start() { - client.setRoutingTableProvider(provider); - server.setRoutingTableProvider(provider); + //when listener was being invoked and addRPCImplementation was being + //called the client was null. + server.setClient(client); server.start(); client.start(); + + } - public void onRouteUpdated(String key, Set values) { - server.onRouteUpdated(key, values); - } - public void onRouteDeleted(String key) { - server.onRouteDeleted(key); - } - + @Override public Collection getProviderFunctionality() { @@ -84,9 +85,6 @@ public class RemoteRpcProvider implements client.close(); } - - - @Override public void stop() { server.stop(); diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcServer.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcServer.java index 932600fcb1..e845e43c2a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcServer.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcServer.java @@ -1,5 +1,12 @@ -package org.opendaylight.controller.sal.connector.remoterpc; +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.connector.remoterpc; public interface RemoteRpcServer extends AutoCloseable { diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RoutingTableProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RoutingTableProvider.java index cfdf98638d..f62c26e0fd 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RoutingTableProvider.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RoutingTableProvider.java @@ -1,27 +1,50 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.sal.connector.remoterpc; +import com.google.common.base.Optional; +import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener; import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.opendaylight.controller.sal.connector.remoterpc.impl.RoutingTableImpl; import org.osgi.framework.BundleContext; import org.osgi.util.tracker.ServiceTracker; -import com.google.common.base.Optional; - public class RoutingTableProvider implements AutoCloseable { @SuppressWarnings("rawtypes") final ServiceTracker tracker; + + private RoutingTableImpl routingTableImpl = null; + + final private RouteChangeListener routeChangeListener; - public RoutingTableProvider(BundleContext ctx) { + public RoutingTableProvider(BundleContext ctx,RouteChangeListener rcl) { @SuppressWarnings("rawtypes") ServiceTracker rawTracker = new ServiceTracker<>(ctx, RoutingTable.class, null); tracker = rawTracker; tracker.open(); + + routeChangeListener = rcl; } public Optional> getRoutingTable() { @SuppressWarnings("unchecked") RoutingTable tracked = tracker.getService(); + + if(tracked instanceof RoutingTableImpl){ + if(routingTableImpl != tracked){ + routingTableImpl= (RoutingTableImpl)tracked; + routingTableImpl.setRouteChangeListener(routeChangeListener); + } + } + return Optional.fromNullable(tracked); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocket.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocket.java deleted file mode 100644 index 7e8590ab9b..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocket.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.sal.connector.remoterpc; - -import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; -import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.zeromq.ZMQ; - -import java.io.IOException; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -/** - * A class encapsulating {@link ZMQ.Socket} of type {@link ZMQ.REQ}. - * It adds following capabilities: - *
  • Retry logic - Tries 3 times before giving up - *
  • Request times out after {@link TIMEOUT} property - *
  • The limitation of {@link ZMQ.REQ}/{@link ZMQ.REP} pair is that no 2 requests can be sent before - * the response for the 1st request is received. To overcome that, this socket queues all messages until - * the previous request has been responded. - */ -public class RpcSocket { - - // Constants - public static final int TIMEOUT = 2000; - public static final int QUEUE_SIZE = 10; - public static final int NUM_RETRIES = 3; - private static final Logger log = LoggerFactory.getLogger(RpcSocket.class); - - private ZMQ.Socket socket; - private ZMQ.Poller poller; - private String address; - private SocketState state; - private long sendTime; - private int retriesLeft; - private LinkedBlockingQueue inQueue; - - - public RpcSocket(String address, ZMQ.Poller poller) { - this.socket = null; - this.state = new IdleSocketState(); - this.sendTime = -1; - this.retriesLeft = NUM_RETRIES; - this.inQueue = new LinkedBlockingQueue(QUEUE_SIZE); - this.address = address; - this.poller = poller; - createSocket(); - } - - public ZMQ.Socket getSocket() { - return socket; - } - - public String getAddress() { - return address; - } - - public int getRetriesLeft() { - return retriesLeft; - } - - public void setRetriesLeft(int retriesLeft) { - this.retriesLeft = retriesLeft; - } - - public SocketState getState() { - return state; - } - - public void setState(SocketState state) { - this.state = state; - } - - public int getQueueSize() { - return inQueue.size(); - } - - public MessageWrapper removeCurrentRequest() { - return inQueue.poll(); - } - - public boolean hasTimedOut() { - return (System.currentTimeMillis() - sendTime > RpcSocket.TIMEOUT); - } - - public void send(MessageWrapper request) throws TimeoutException { - try { - boolean success = inQueue.offer(request, TIMEOUT, TimeUnit.MILLISECONDS); - if (!success) { - throw new TimeoutException("send :: Queue is full"); - } - process(); - } - catch (InterruptedException e) { - log.error("send : Thread interrupted while attempting to add request to inQueue", e); - } - } - - public MessageWrapper receive() { - Message response = parseMessage(); - MessageWrapper messageWrapper = inQueue.poll(); //remove the message from queue - MessageWrapper responseMessageWrapper = new MessageWrapper(response, messageWrapper.getReceiveSocket()); - - state = new IdleSocketState(); - retriesLeft = NUM_RETRIES; - return responseMessageWrapper; - } - - public void process() { - if (getQueueSize() > 0) //process if there's message in the queue - state.process(this); - } - - // Called by IdleSocketState & BusySocketState - public void sendMessage() { - //Get the message from queue without removing it. For retries - MessageWrapper messageWrapper = inQueue.peek(); - if (messageWrapper != null) { - Message message = messageWrapper.getMessage(); - try { - socket.send(Message.serialize(message)); - } - catch (IOException e) { - log.debug("Message send failed [{}]", message); - log.debug("Exception [{}]", e); - } - sendTime = System.currentTimeMillis(); - } - } - - public Message parseMessage() { - Message parsedMessage = null; - byte[] bytes = socket.recv(); - log.debug("Received bytes:[{}]", bytes.length); - try { - parsedMessage = (Message)Message.deserialize(bytes); - } - catch (IOException|ClassNotFoundException e) { - log.debug("parseMessage : Deserializing received bytes failed", e); - } - - return parsedMessage; - } - - public void recycleSocket() { - close(); - } - - public void close() { - socket.setLinger(10); - socket.close(); - } - - private void createSocket() { - socket = Context.getInstance().getZmqContext().socket(ZMQ.REQ); - socket.connect(address); - poller.register(socket, ZMQ.Poller.POLLIN); - state = new IdleSocketState(); - } - - - /** - * Represents the state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket} - */ - public static interface SocketState { - - /* The processing actions to be performed in this state - */ - public void process(RpcSocket socket); - } - - /** - * Represents the idle state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket} - */ - public static class IdleSocketState implements SocketState { - - @Override - public void process(RpcSocket socket) { - socket.sendMessage(); - socket.setState(new BusySocketState()); - socket.setRetriesLeft(socket.getRetriesLeft()-1); - } - } - - /** - * Represents the busy state of a {@link org.opendaylight.controller.sal.connector.remoterpc.RpcSocket} - */ - public static class BusySocketState implements SocketState { - - private static Logger log = LoggerFactory.getLogger(BusySocketState.class); - - @Override - public void process(RpcSocket socket) { - if (socket.hasTimedOut()) { - if (socket.getRetriesLeft() > 0) { - log.debug("process : Request timed out, retrying now..."); - socket.sendMessage(); - socket.setRetriesLeft(socket.getRetriesLeft() - 1); - } - else { - // No more retries for current request, so stop processing the current request - MessageWrapper message = socket.removeCurrentRequest(); - if (message != null) { - log.error("Unable to process rpc request [{}]", message); - socket.setState(new IdleSocketState()); - socket.setRetriesLeft(NUM_RETRIES); - } - } - } - // Else no timeout, so allow processing to continue - } - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java deleted file mode 100644 index f53d5adc1c..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/Sender.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.sal.connector.remoterpc; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; -import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.zeromq.ZMQ; - -import java.io.IOException; -import java.util.concurrent.TimeoutException; - -import static com.google.common.base.Preconditions.*; - -/** - * Main server thread for sending requests. - */ -public class Sender implements Runnable{ - - private final static Logger _logger = LoggerFactory.getLogger(Sender.class); - private final Client client; - - - - - public Sender(Client client) { - super(); - this.client = client; - } - -@Override - public void run() { - _logger.info("Starting..."); - - try (SocketManager socketManager = new SocketManager()){ - while (!Thread.currentThread().isInterrupted()) { - - //read incoming messages from blocking queue - MessageWrapper request = pollForRequest(); - - if (request != null) { - processRequest(socketManager, request); - } - - flushSockets(socketManager); - pollForResponse(socketManager); - processResponse(socketManager); - - } - } catch(Exception t){ - _logger.error("Exception: [{}]", t); - _logger.error("Stopping..."); - } - } - - private void processResponse(SocketManager socketManager) { - for (int i = 0; i < socketManager.getPoller().getSize(); i++) { - // If any sockets get a response, process it - if (socketManager.getPoller().pollin(i)) { - Optional socket = socketManager.getManagedSocketFor( - socketManager.getPoller().getItem(i).getSocket()); - - checkState(socket.isPresent(), "Managed socket not found"); - - MessageWrapper response = socket.get().receive(); - _logger.debug("Received rpc response [{}]", response.getMessage()); - - //TODO: handle exception and introduce timeout on receiver side - try { - response.getReceiveSocket().send(Message.serialize(response.getMessage())); - } catch (IOException e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } - } - } - } - - private void processRequest(SocketManager socketManager, MessageWrapper request) throws TimeoutException { - - if ((request.getMessage() == null) || - (request.getMessage().getRecipient() == null)) { - //invalid message. log and drop - _logger.error("Invalid request [{}]", request); - return; - } - - RpcSocket socket = - socketManager.getManagedSocket(request.getMessage().getRecipient()); - - socket.send(request); - } - - private void flushSockets(SocketManager socketManager){ - for (RpcSocket socket : socketManager.getManagedSockets()){ - socket.process(); - } - } - - private MessageWrapper pollForRequest(){ - return client.getRequestQueue().poll(); - } - - private void pollForResponse(SocketManager socketManager){ - try{ - socketManager.getPoller().poll(10); //poll every 10ms - }catch (Throwable t) { /*Ignore and continue*/ } - } -} - - -/* -SCALA - -package org.opendaylight.controller.sal.connector.remoterpc - - import org.slf4j.{LoggerFactory, Logger} - import scala.collection.JavaConverters._ - import scala.Some - import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, Message} -*/ -/** - * Main server thread for sending requests. This does not maintain any state. If the - * thread dies, it will be restarted - */ -/*class Sender extends Runnable { - private val _logger: Logger = LoggerFactory.getLogger(Sender.this.getClass()) - - override def run = { - _logger.info("Sender starting...") - val socketManager = new SocketManager() - - try { - while (!Thread.currentThread().isInterrupted) { - //read incoming messages from blocking queue - val request: MessageWrapper = Client.requestQueue.poll() - - if (request != null) { - if ((request.message != null) && - (request.message.getRecipient != null)) { - - val socket = socketManager.getManagedSocket(request.message.getRecipient) - socket.send(request) - } else { - //invalid message. log and drop - _logger.error("Invalid request [{}]", request) - } - } - - socketManager.getManagedSockets().asScala.map(s => s.process) - - // Poll all sockets for responses every 1 sec - poll(socketManager) - - // If any sockets get a response, process it - for (i <- 0 until socketManager.poller.getSize) { - if (socketManager.poller.pollin(i)) { - val socket = socketManager.getManagedSocketFor(socketManager.poller.getItem(i).getSocket) - - socket match { - case None => //{ - _logger.error("Could not find a managed socket for zmq socket") - throw new IllegalStateException("Could not find a managed socket for zmq socket") - //} - case Some(s) => { - val response = s.receive() - _logger.debug("Received rpc response [{}]", response.message) - response.receiveSocket.send(Message.serialize(response.message)) - } - } - } - } - - } - } catch{ - case e:Exception => { - _logger.debug("Sender stopping due to exception") - e.printStackTrace() - } - } finally { - socketManager.stop - } - } - - def poll(socketManager:SocketManager) = { - try{ - socketManager.poller.poll(10) - }catch{ - case t:Throwable => //ignore and continue - } - } -} - - -// def newThread(r: Runnable): Thread = { -// val t = new RequestHandler() -// t.setUncaughtExceptionHandler(new RequestProcessorExceptionHandler) -// t -// } - - - -/** - * Restarts the request processing server in the event of unforeseen exceptions - */ -//private class RequestProcessorExceptionHandler extends UncaughtExceptionHandler { -// def uncaughtException(t: Thread, e: Throwable) = { -// _logger.error("Exception caught during request processing [{}]", e) -// _logger.info("Restarting request processor server...") -// RequestProcessor.start() -// } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java index 83b93858cf..b5a67ff0df 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java @@ -8,278 +8,422 @@ package org.opendaylight.controller.sal.connector.remoterpc; import com.google.common.base.Optional; - +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Sets; +import org.opendaylight.controller.md.sal.common.api.routing.RouteChange; import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener; import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; -import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; -import org.opendaylight.controller.sal.connector.remoterpc.dto.Message.MessageType; import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; -import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; -import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; +import org.opendaylight.controller.sal.core.api.RpcRoutingContext; import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; -import java.io.IOException; +import java.net.Inet4Address; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; import java.util.HashSet; +import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkNotNull; /** - * ZeroMq based implementation of RpcRouter TODO: 1. Make rpc request handling - * async and non-blocking. Note zmq socket is not thread safe 2. Read properties - * from config file using existing(?) ODL properties framework + * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable + * so that it gets route change notifications from routing table. */ -public class ServerImpl implements RemoteRpcServer, RouteChangeListener { +public class ServerImpl implements RemoteRpcServer, RouteChangeListener { - private Logger _logger = LoggerFactory.getLogger(ServerImpl.class); + private Logger _logger = LoggerFactory.getLogger(ServerImpl.class); - private ExecutorService serverPool; + private ExecutorService serverPool; + protected ServerRequestHandler handler; - // private RoutingTable routingTable; - private RoutingTableProvider routingTable; - private Set remoteServices; - private ProviderSession brokerSession; - private ZMQ.Context context; - private ZMQ.Socket replySocket; + private Set remoteServices; + private ProviderSession brokerSession; + private ZMQ.Context context; - private final RpcListener listener = new RpcListener(); + private final RpcListener listener = new RpcListener(); - private final String localUri = Context.getInstance().getLocalUri(); + private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler"; + private final int HANDLER_WORKER_COUNT = 2; + private final int HWM = 200;//high water mark on sockets + private volatile State status = State.STOPPED; - private final int rpcPort; + private String serverAddress; + private int port; - private RpcImplementation client; + private ClientImpl client; - public RpcImplementation getClient() { - return client; - } + private RoutingTableProvider routingTableProvider; - public void setClient(RpcImplementation client) { - this.client = client; - } + public static enum State { + STARTING, STARTED, STOPPED; + } - // Prevent instantiation - public ServerImpl(int rpcPort) { - this.rpcPort = rpcPort; - } + public ServerImpl(int port) { + this.port = port; + this.serverAddress = new StringBuilder(findIpAddress()). + append(":"). + append(port). + toString(); + } - public void setBrokerSession(ProviderSession session) { - this.brokerSession = session; - } + public RoutingTableProvider getRoutingTableProvider() { + return routingTableProvider; + } - public ExecutorService getServerPool() { - return serverPool; - } + public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) { + this.routingTableProvider = routingTableProvider; + } - public void setServerPool(ExecutorService serverPool) { - this.serverPool = serverPool; - } + public ClientImpl getClient(){ + return this.client; + } - public void start() { - context = ZMQ.context(1); - serverPool = Executors.newSingleThreadExecutor(); - remoteServices = new HashSet(); + public void setClient(ClientImpl client) { + this.client = client; + } - // Start listening rpc requests - serverPool.execute(receive()); + public State getStatus() { + return this.status; + } - brokerSession.addRpcRegistrationListener(listener); - // routingTable.registerRouteChangeListener(routeChangeListener); + public Optional getHandler() { + return Optional.fromNullable(this.handler); + } - Set currentlySupported = brokerSession.getSupportedRpcs(); - for (QName rpc : currentlySupported) { - listener.onRpcImplementationAdded(rpc); - } + public void setBrokerSession(ProviderSession session) { + this.brokerSession = session; + } - _logger.debug("RPC Server started [{}]", localUri); - } + public Optional getBrokerSession() { + return Optional.fromNullable(this.brokerSession); + } - public void stop() { - // TODO: un-subscribe + public Optional getZmqContext() { + return Optional.fromNullable(this.context); + } - // if (context != null) - // context.term(); - // - // _logger.debug("ZMQ Context is terminated."); + public String getServerAddress() { + return serverAddress; + } - if (serverPool != null) - serverPool.shutdown(); + public String getHandlerAddress() { + return HANDLER_INPROC_ADDRESS; + } - _logger.debug("Thread pool is closed."); - } + /** + * + */ + public void start() { + Preconditions.checkState(State.STOPPED == this.getStatus(), + "Remote RPC Server is already running"); - private Runnable receive() { - return new Runnable() { - public void run() { - - // Bind to RPC reply socket - replySocket = context.socket(ZMQ.REP); - replySocket.bind("tcp://*:" + Context.getInstance().getRpcPort()); - - // Poller enables listening on multiple sockets using a single - // thread - ZMQ.Poller poller = new ZMQ.Poller(1); - poller.register(replySocket, ZMQ.Poller.POLLIN); - try { - // TODO: Add code to restart the thread after exception - while (!Thread.currentThread().isInterrupted()) { - - poller.poll(); - - if (poller.pollin(0)) { - handleRpcCall(); - } - } - } catch (Exception e) { - // log and continue - _logger.error("Unhandled exception [{}]", e); - } finally { - poller.unregister(replySocket); - replySocket.close(); - } - - } - }; - } + status = State.STARTING; + context = ZMQ.context(1); + remoteServices = new HashSet();// + serverPool = Executors.newSingleThreadExecutor();//main server thread + serverPool.execute(receive()); // Start listening rpc requests + brokerSession.addRpcRegistrationListener(listener); - /** - * @throws InterruptedException - * @throws ExecutionException - */ - private void handleRpcCall() { + announceLocalRpcs(); - Message request = parseMessage(replySocket); + registerRemoteRpcs(); - _logger.debug("Received rpc request [{}]", request); + status = State.STARTED; + _logger.info("Remote RPC Server started [{}]", getServerAddress()); + } - // Call broker to process the message then reply - Future> rpc = null; - RpcResult result = null; - try { - rpc = brokerSession.rpc((QName) request.getRoute().getType(), - XmlUtils.xmlToCompositeNode((String) request.getPayload())); + public void stop(){ + close(); + } - result = (rpc != null) ? rpc.get() : null; + /** + * + */ + @Override + public void close() { - } catch (Exception e) { - _logger.debug("Broker threw [{}]", e); - } + if (State.STOPPED == this.getStatus()) return; //do nothing + + unregisterLocalRpcs(); - CompositeNode payload = (result != null) ? result.getResult() : null; + if (serverPool != null) + serverPool.shutdown(); - Message response = new Message.MessageBuilder().type(MessageType.RESPONSE).sender(localUri) - .route(request.getRoute()).payload(XmlUtils.compositeNodeToXml(payload)).build(); + closeZmqContext(); - _logger.debug("Sending rpc response [{}]", response); + status = State.STOPPED; + _logger.info("Remote RPC Server stopped"); + } + + /** + * Closes ZMQ Context. It tries to gracefully terminate the context. If + * termination takes more than a second, its forcefully shutdown. + */ + private void closeZmqContext() { + ExecutorService exec = Executors.newSingleThreadExecutor(); + FutureTask zmqTermination = new FutureTask(new Runnable() { + @Override + public void run() { try { - replySocket.send(Message.serialize(response)); + if (context != null) + context.term(); + _logger.debug("ZMQ Context terminated gracefully!"); } catch (Exception e) { - _logger.debug("rpc response send failed for message [{}]", response); - _logger.debug("{}", e); + _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e); } + } + }, null); + + exec.execute(zmqTermination); + + try { + zmqTermination.get(5L, TimeUnit.SECONDS); + } catch (Exception e) {/*ignore and continue with shutdown*/} + + exec.shutdownNow(); + } + + /** + * Main listener thread that spawns {@link ServerRequestHandler} as workers. + * + * @return + */ + private Runnable receive() { + return new Runnable() { + + @Override + public void run() { + Thread.currentThread().setName("remote-rpc-server"); + _logger.debug("Remote RPC Server main thread starting..."); + + //socket clients connect to (frontend) + ZMQ.Socket clients = context.socket(ZMQ.ROUTER); + + //socket RequestHandlers connect to (backend) + ZMQ.Socket workers = context.socket(ZMQ.DEALER); + + try (SocketPair capturePair = new SocketPair(); + ServerRequestHandler requestHandler = new ServerRequestHandler(context, + brokerSession, + HANDLER_WORKER_COUNT, + HANDLER_INPROC_ADDRESS, + getServerAddress());) { + + handler = requestHandler; + clients.setHWM(HWM); + clients.bind("tcp://*:" + port); + workers.setHWM(HWM); + workers.bind(HANDLER_INPROC_ADDRESS); + //start worker threads + _logger.debug("Remote RPC Server worker threads starting..."); + requestHandler.start(); + //start capture thread + // handlerPool.execute(new CaptureHandler(capturePair.getReceiver())); + // Connect work threads to client threads via a queue + ZMQ.proxy(clients, workers, null);//capturePair.getSender()); + } catch (Exception e) { + _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage()); + } finally { + if (clients != null) clients.close(); + if (workers != null) workers.close(); + _logger.info("Remote RPC Server stopped"); + } + } + }; + } + + /** + * Register the remote RPCs from the routing table into broker + */ + private void registerRemoteRpcs(){ + Optional> routingTableOptional = routingTableProvider.getRoutingTable(); + + Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent"); + + Set remoteRoutes = + routingTableProvider.getRoutingTable().get().getAllRoutes(); + + //filter out all entries that contains local address + //we dont want to register local RPCs as remote + Predicate notLocalAddressFilter = new Predicate(){ + public boolean apply(Map.Entry remoteRoute){ + return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue()); + } + }; + + //filter the entries created by current node + Set filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter); + + for (Map.Entry route : filteredRemoteRoutes){ + onRouteUpdated((String) route.getKey(), "");//value is not needed by broker + } + } + + /** + * Un-Register the local RPCs from the routing table + */ + private void unregisterLocalRpcs(){ + Set currentlySupported = brokerSession.getSupportedRpcs(); + for (QName rpc : currentlySupported) { + listener.onRpcImplementationRemoved(rpc); + } + } + + /** + * Publish all the locally registered RPCs in the routing table + */ + private void announceLocalRpcs(){ + Set currentlySupported = brokerSession.getSupportedRpcs(); + for (QName rpc : currentlySupported) { + listener.onRpcImplementationAdded(rpc); + } + } + + /** + * @param key + * @param value + */ + @Override + public void onRouteUpdated(String key, String value) { + RouteIdentifierImpl rId = new RouteIdentifierImpl(); + try { + _logger.debug("Updating key/value {}-{}", key, value); + brokerSession.addRpcImplementation( + (QName) rId.fromString(key).getType(), client); + + //TODO: Check with Tony for routed rpc + //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client); + } catch (Exception e) { + _logger.info("Route update failed {}", e); } + } + + /** + * @param key + */ + @Override + public void onRouteDeleted(String key) { + //TODO: Broker session needs to be updated to support this + throw new UnsupportedOperationException(); + } + + /** + * Finds IPv4 address of the local VM + * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which + * address will be returned. Read IP from a property file or enhance the code to make it deterministic. + * Should we use IP or hostname? + * + * @return + */ + private String findIpAddress() { + String hostAddress = null; + Enumeration e = null; + try { + e = NetworkInterface.getNetworkInterfaces(); + } catch (SocketException e1) { + e1.printStackTrace(); + } + while (e.hasMoreElements()) { - /** - * @param socket - * @return - */ - private Message parseMessage(ZMQ.Socket socket) { + NetworkInterface n = (NetworkInterface) e.nextElement(); - Message msg = null; - try { - byte[] bytes = socket.recv(); - _logger.debug("Received bytes:[{}]", bytes.length); - msg = (Message) Message.deserialize(bytes); - } catch (Throwable t) { - t.printStackTrace(); - } - return msg; + Enumeration ee = n.getInetAddresses(); + while (ee.hasMoreElements()) { + InetAddress i = (InetAddress) ee.nextElement(); + if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) + hostAddress = i.getHostAddress(); + } } + return hostAddress; + + } + + /** + * Listener for rpc registrations + */ + private class RpcListener implements RpcRegistrationListener { @Override - public void onRouteUpdated(String key, Set values) { - RouteIdentifierImpl rId = new RouteIdentifierImpl(); - try { - _logger.debug("Updating key/value {}-{}", key, values); - brokerSession.addRpcImplementation((QName) rId.fromString(key).getType(), client); + public void onRpcImplementationAdded(QName name) { - } catch (Exception e) { - _logger.info("Route update failed {}", e); - } + //if the service name exists in the set, this notice + //has bounced back from the broker. It should be ignored + if (remoteServices.contains(name)) + return; + + _logger.debug("Adding registration for [{}]", name); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(name); + + RoutingTable routingTable = getRoutingTable(); + + try { + routingTable.addGlobalRoute(routeId.toString(), getServerAddress()); + _logger.debug("Route added [{}-{}]", name, getServerAddress()); + + } catch (RoutingTableException | SystemException e) { + //TODO: This can be thrown when route already exists in the table. Broker + //needs to handle this. + _logger.error("Unhandled exception while adding global route to routing table [{}]", e); + + } } @Override - public void onRouteDeleted(String key) { - // TODO: Broker session needs to be updated to support this - throw new UnsupportedOperationException(); + public void onRpcImplementationRemoved(QName name) { + + _logger.debug("Removing registration for [{}]", name); + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(name); + + RoutingTable routingTable = getRoutingTable(); + + try { + routingTable.removeGlobalRoute(routeId.toString()); + } catch (RoutingTableException | SystemException e) { + _logger.error("Route delete failed {}", e); + } } - - /** - * Listener for rpc registrations - */ - private class RpcListener implements RpcRegistrationListener { - - - - @Override - public void onRpcImplementationAdded(QName name) { - - // if the service name exists in the set, this notice - // has bounced back from the broker. It should be ignored - if (remoteServices.contains(name)) - return; - - _logger.debug("Adding registration for [{}]", name); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(); - routeId.setType(name); - - try { - routingTable.getRoutingTable().get().addGlobalRoute(routeId.toString(), localUri); - _logger.debug("Route added [{}-{}]", name, localUri); - } catch (RoutingTableException | SystemException e) { - // TODO: This can be thrown when route already exists in the - // table. Broker - // needs to handle this. - _logger.error("Unhandled exception while adding global route to routing table [{}]", e); - - } - } - @Override - public void onRpcImplementationRemoved(QName name) { + private RoutingTable getRoutingTable(){ + Optional> routingTable = + routingTableProvider.getRoutingTable(); - _logger.debug("Removing registration for [{}]", name); - RouteIdentifierImpl routeId = new RouteIdentifierImpl(); - routeId.setType(name); + checkNotNull(routingTable.isPresent(), "Routing table is null"); - try { - routingTable.getRoutingTable().get().removeGlobalRoute(routeId.toString()); - } catch (RoutingTableException | SystemException e) { - _logger.error("Route delete failed {}", e); - } - } + return routingTable.get(); } + } + + /* + * Listener for Route changes in broker. Broker notifies this listener in the event + * of any change (add/delete). Listener then updates the routing table. + */ + private class BrokerRouteChangeListener + implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener{ @Override - public void close() throws Exception { - stop(); - } + public void onRouteChange(RouteChange routeChange) { - public void setRoutingTableProvider(RoutingTableProvider provider) { - this.routingTable = provider; } + } } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java new file mode 100644 index 0000000000..949e6ee9a8 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandler.java @@ -0,0 +1,207 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * + */ +public class ServerRequestHandler implements AutoCloseable{ + + private Logger _logger = LoggerFactory.getLogger(ServerRequestHandler.class); + private final String DEFAULT_NAME = "remote-rpc-worker"; + private String dealerAddress; + private String serverAddress; + private int workerCount; + private ZMQ.Context context; + private Broker.ProviderSession broker; + + private RequestHandlerThreadPool workerPool; + private final AtomicInteger threadId = new AtomicInteger(); + + public ServerRequestHandler(ZMQ.Context context, + Broker.ProviderSession session, + int workerCount, + String dealerAddress, + String serverAddress) { + this.context = context; + this.dealerAddress = dealerAddress; + this.serverAddress = serverAddress; + this.broker = session; + this.workerCount = workerCount; + } + + public ThreadPoolExecutor getWorkerPool(){ + return workerPool; + } + + public void start(){ + workerPool = new RequestHandlerThreadPool( + workerCount, workerCount, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + //unbound is ok. Task will never be submitted + + for (int i=0;i> rpc = null; + RpcResult result = null; + + //TODO Call this in a new thread with timeout + try { + rpc = broker.rpc( + (QName) request.getRoute().getType(), + XmlUtils.xmlToCompositeNode((String) request.getPayload())); + + result = (rpc != null) ? rpc.get() : null; + + } catch (Exception e) { + _logger.debug("Broker threw [{}]", e); + } + + CompositeNode payload = (result != null) ? result.getResult() : null; + + Message response = new Message.MessageBuilder() + .type(Message.MessageType.RESPONSE) + .sender(serverAddress) + .route(request.getRoute()) + .payload(XmlUtils.compositeNodeToXml(payload)) + .build(); + + _logger.debug("Sending rpc response [{}]", response); + + try { + socket.send(Message.serialize(response)); + } catch (Exception e) { + _logger.debug("rpc response send failed for message [{}]", response); + _logger.debug("{}", e); + } + } + } + } catch (Exception e) { + printException(e); + } finally { + closeSocket(socket); + } + } + + /** + * @param socket + * @return + */ + private Message parseMessage(ZMQ.Socket socket) throws Exception { + byte[] bytes = socket.recv(); //this blocks + _logger.debug("Received bytes:[{}]", bytes.length); + return (Message) Message.deserialize(bytes); + } + + private void printException(Exception e) { + try (StringWriter s = new StringWriter(); + PrintWriter p = new PrintWriter(s)) { + e.printStackTrace(p); + _logger.debug(s.toString()); + } catch (IOException e1) {/*Ignore and continue*/ } + } + + private void closeSocket(ZMQ.Socket socket) { + try { + if (socket != null) socket.close(); + } catch (Exception x) { + _logger.debug("Exception while closing socket [{}]", x); + } finally { + if (socket != null) socket.close(); + } + _logger.debug("Closing..."); + } + } + + + /** + * + */ + public class RequestHandlerThreadPool extends ThreadPoolExecutor{ + + public RequestHandlerThreadPool(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + if (isTerminating() || isTerminated() || isShutdown()) + return; + + if ( t != null ){ + _logger.debug("Exception caught while terminating worker [{},{}]", t.getClass(), t.getMessage()); + } + + this.execute(new Worker(threadId.incrementAndGet())); + super.afterExecute(r, null); + } + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java deleted file mode 100644 index 588a299626..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManager.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.sal.connector.remoterpc; - -import com.google.common.base.Optional; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.zeromq.ZMQ; -import java.net.UnknownHostException; -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; - - -/** - * Manages creation of {@link RpcSocket} and their registration with {@link ZMQ.Poller} - */ -public class SocketManager implements AutoCloseable{ - private static final Logger log = LoggerFactory.getLogger(SocketManager.class); - - /* - * RpcSockets mapped by network address its connected to - */ - private ConcurrentHashMap managedSockets = new ConcurrentHashMap(); - - private ZMQ.Poller _poller = new ZMQ.Poller(2); //randomly selected size. Poller grows automatically - - /** - * Returns a {@link RpcSocket} for the given address - * @param address network address with port eg: 10.199.199.20:5554 - * @return - */ - public RpcSocket getManagedSocket(String address) throws IllegalArgumentException { - //Precondition - if (!address.matches("(tcp://)(.*)(:)(\\d*)")) { - throw new IllegalArgumentException("Address must of format 'tcp://:' but is " + address); - } - - if (!managedSockets.containsKey(address)) { - log.debug("{} Creating new socket for {}", Thread.currentThread().getName()); - RpcSocket socket = new RpcSocket(address, _poller); - managedSockets.put(address, socket); - } - - return managedSockets.get(address); - } - - /** - * Returns a {@link RpcSocket} for the given {@link ZMQ.Socket} - * @param socket - * @return - */ - public Optional getManagedSocketFor(ZMQ.Socket socket) { - for (RpcSocket rpcSocket : managedSockets.values()) { - if (rpcSocket.getSocket().equals(socket)) { - return Optional.of(rpcSocket); - } - } - return Optional.absent(); - } - - /** - * Return a collection of all managed sockets - * @return - */ - public Collection getManagedSockets() { - return managedSockets.values(); - } - - /** - * Returns the {@link ZMQ.Poller} - * @return - */ - public ZMQ.Poller getPoller() { - return _poller; - } - - /** - * This should be called when stopping the server to close all the sockets - * @return - */ - @Override - public void close() throws Exception { - log.debug("Stopping..."); - for (RpcSocket socket : managedSockets.values()) { - socket.close(); - } - managedSockets.clear(); - log.debug("Stopped"); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/CompositeNodeImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/CompositeNodeImpl.java deleted file mode 100644 index 073601a1c0..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/CompositeNodeImpl.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.sal.connector.remoterpc.dto; - -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.*; - -import java.io.Serializable; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class CompositeNodeImpl implements CompositeNode, Serializable { - - private QName key; - private List> children; - - @Override - public List> getChildren() { - return children; - } - - @Override - public List getCompositesByName(QName children) { - throw new UnsupportedOperationException(); - } - - @Override - public List getCompositesByName(String children) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public List> getSimpleNodesByName(QName children) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public List> getSimpleNodesByName(String children) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public CompositeNode getFirstCompositeByName(QName container) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public SimpleNode getFirstSimpleByName(QName leaf) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public MutableCompositeNode asMutable() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public QName getKey() { - return key; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public List> setValue(List> value) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public int size() { - return 0; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public boolean isEmpty() { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public boolean containsKey(Object key) { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public boolean containsValue(Object value) { - return false; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public List> get(Object key) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public List> put(QName key, List> value) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public List> remove(Object key) { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void putAll(Map>> m) { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void clear() { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public Set keySet() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public Collection>> values() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public Set>>> entrySet() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public QName getNodeType() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public CompositeNode getParent() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public List> getValue() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public ModifyAction getModificationAction() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RpcRequestImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RpcRequestImpl.java deleted file mode 100644 index f26f78b31e..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RpcRequestImpl.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.sal.connector.remoterpc.dto; - -import org.opendaylight.controller.sal.connector.api.RpcRouter; -import org.opendaylight.yangtools.yang.common.QName; -import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; - -import java.io.Serializable; - -public class RpcRequestImpl implements RpcRouter.RpcRequest,Serializable { - - private RpcRouter.RouteIdentifier routeIdentifier; - private Object payload; - - @Override - public RpcRouter.RouteIdentifier getRoutingInformation() { - return routeIdentifier; - } - - public void setRouteIdentifier(RpcRouter.RouteIdentifier routeIdentifier) { - this.routeIdentifier = routeIdentifier; - } - - @Override - public Object getPayload() { - return payload; - } - - public void setPayload(Object payload) { - this.payload = payload; - } - -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/util/XmlUtils.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/util/XmlUtils.java index 7dab2e3b75..3bc39630e1 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/util/XmlUtils.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/util/XmlUtils.java @@ -1,3 +1,11 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.sal.connector.remoterpc.util; import org.opendaylight.yangtools.yang.data.api.CompositeNode; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/scala/org/opendaylight/controller/sal/connector/remoterpc/Client.scala b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/scala/org/opendaylight/controller/sal/connector/remoterpc/Client.scala deleted file mode 100644 index 63b68089d3..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/scala/org/opendaylight/controller/sal/connector/remoterpc/Client.scala +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.sal.connector.remoterpc - -import org.opendaylight.yangtools.yang.data.api.CompositeNode -import org.opendaylight.yangtools.yang.common.{RpcError, RpcResult, QName} -import org.opendaylight.controller.sal.core.api.RpcImplementation -import java.util -import java.util.{UUID, Collections} -import org.zeromq.ZMQ -import org.opendaylight.controller.sal.common.util.{RpcErrors, Rpcs} -import org.slf4j.LoggerFactory -import org.opendaylight.controller.sal.connector.remoterpc.dto.{MessageWrapper, RouteIdentifierImpl, Message} -import Message.MessageType -import java.util.concurrent._ -import java.lang.InterruptedException - - -/** - * An implementation of {@link RpcImplementation} that makes - * remote RPC calls - */ -class Client extends RemoteRpcClient { - - private val _logger = LoggerFactory.getLogger(this.getClass); - - val requestQueue = new LinkedBlockingQueue[MessageWrapper](100) - val pool: ExecutorService = Executors.newSingleThreadExecutor() - private val TIMEOUT = 5000 //in ms - var routingTableProvider: RoutingTableProvider = null - - - def getInstance = this - - - def setRoutingTableProvider(provider : RoutingTableProvider) = { - routingTableProvider = provider; - } - - def getSupportedRpcs: util.Set[QName] = { - Collections.emptySet() - } - - def invokeRpc(rpc: QName, input: CompositeNode): RpcResult[CompositeNode] = { - - val routeId = new RouteIdentifierImpl() - routeId.setType(rpc) - - //lookup address for the rpc request - val routingTable = routingTableProvider.getRoutingTable() - require( routingTable != null, "Routing table not found. Exiting" ) - - val addresses:util.Set[String] = routingTable.getRoutes(routeId.toString) - require(addresses != null, "Address not found for rpc " + rpc); - require(addresses.size() == 1) //its a global service. - - val address = addresses.iterator().next() - require(address != null, "Address is null") - - //create in-process "pair" socket and pass it to sender thread - //Sender replies on this when result is available - val inProcAddress = "inproc://" + UUID.randomUUID() - val receiver = Context.zmqContext.socket(ZMQ.PAIR) - receiver.bind(inProcAddress); - - val sender = Context.zmqContext.socket(ZMQ.PAIR) - sender.connect(inProcAddress) - - val requestMessage = new Message.MessageBuilder() - .`type`(MessageType.REQUEST) - //.sender("tcp://localhost:8081") - .recipient(address) - .route(routeId) - .payload(input) - .build() - - _logger.debug("Queuing up request and expecting response on [{}]", inProcAddress) - - val messageWrapper = new MessageWrapper(requestMessage, sender) - val errors = new util.ArrayList[RpcError] - - try { - process(messageWrapper) - val response = parseMessage(receiver) - - return Rpcs.getRpcResult( - true, response.getPayload.asInstanceOf[CompositeNode], Collections.emptySet()) - - } catch { - case e: Exception => { - errors.add(RpcErrors.getRpcError(null,null,null,null,e.getMessage,null,e.getCause)) - return Rpcs.getRpcResult(false, null, errors) - } - } finally { - receiver.close(); - sender.close(); - } - - } - - /** - * Block on socket for reply - * @param receiver - * @return - */ - private def parseMessage(receiver:ZMQ.Socket): Message = { - val bytes = receiver.recv() - return Message.deserialize(bytes).asInstanceOf[Message] - } - - def start() = { - pool.execute(new Sender) - } - - def process(msg: MessageWrapper) = { - _logger.debug("Processing message [{}]", msg) - val success = requestQueue.offer(msg, TIMEOUT, TimeUnit.MILLISECONDS) - - if (!success) throw new TimeoutException("Queue is full"); - - } - - def stop() = { - pool.shutdown() //intiate shutdown - _logger.debug("Client stopping...") - // Context.zmqContext.term(); - // _logger.debug("ZMQ context terminated") - - try { - - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - _logger.error("Client thread pool did not shut down"); - } - } catch { - case ie:InterruptedException => - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - _logger.debug("Client stopped") - } - - def close() = { - stop(); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImplTest.java new file mode 100644 index 0000000000..5dd21429ff --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImplTest.java @@ -0,0 +1,103 @@ +package org.opendaylight.controller.sal.connector.remoterpc; + +import com.google.common.base.Optional; +import junit.framework.Assert; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.sal.common.util.Rpcs; +import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; +import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil; +import org.opendaylight.yangtools.yang.common.RpcError; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; + +import java.io.IOException; +import java.util.*; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * + */ +public class ClientImplTest { + RoutingTableProvider routingTableProvider; + ClientImpl client; + ClientRequestHandler mockHandler; + + @Before + public void setUp() throws Exception { + + //mock routing table + routingTableProvider = mock(RoutingTableProvider.class); + RoutingTable mockRoutingTable = new MockRoutingTable(); + Optional> optionalRoutingTable = Optional.fromNullable(mockRoutingTable); + when(routingTableProvider.getRoutingTable()).thenReturn(optionalRoutingTable); + + //mock ClientRequestHandler + mockHandler = mock(ClientRequestHandler.class); + + client = new ClientImpl(mockHandler); + client.setRoutingTableProvider(routingTableProvider); + + } + + @After + public void tearDown() throws Exception { + + } + + @Test + public void getRoutingTableProvider_Call_ShouldReturnMockProvider() throws Exception { + Assert.assertEquals(routingTableProvider, client.getRoutingTableProvider()); + + } + + @Test + public void testStart() throws Exception { + + } + + @Test + public void testStop() throws Exception { + + } + + @Test + public void testClose() throws Exception { + + } + + @Test + public void invokeRpc_NormalCall_ShouldReturnSuccess() throws Exception { + + when(mockHandler.handle(any(Message.class))). + thenReturn(MessagingUtil.createEmptyMessage()); + + RpcResult result = client.invokeRpc(null, null); + + Assert.assertTrue(result.isSuccessful()); + Assert.assertTrue(result.getErrors().isEmpty()); + Assert.assertNull(result.getResult()); + } + + @Test + public void invokeRpc_HandlerThrowsException_ShouldReturnError() throws Exception { + + when(mockHandler.handle(any(Message.class))). + thenThrow(new IOException()); + + RpcResult result = client.invokeRpc(null, null); + + Assert.assertFalse(result.isSuccessful()); + Assert.assertFalse(result.getErrors().isEmpty()); + Assert.assertNull(result.getResult()); + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandlerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandlerTest.java new file mode 100644 index 0000000000..e1345945a7 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientRequestHandlerTest.java @@ -0,0 +1,113 @@ +package org.opendaylight.controller.sal.connector.remoterpc; + +import junit.framework.Assert; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; + +import java.io.IOException; +import java.util.concurrent.*; + +/** + * + */ +public class ClientRequestHandlerTest { + + private final Logger _logger = LoggerFactory.getLogger(ClientRequestHandlerTest.class); + + ZMQ.Context context; + ExecutorService serverThread; + final String SERVER_ADDRESS = "localhost:5554"; + + ClientRequestHandler handler; + + @Before + public void setUp() throws Exception { + context = ZMQ.context(1); + serverThread = Executors.newCachedThreadPool(); + handler = new ClientRequestHandler(context); + } + + @After + public void tearDown() throws Exception { + serverThread.shutdown(); + MessagingUtil.closeZmqContext(context); + handler.close(); + } + + @Test + public void handle_SingleRemote_ShouldReturnResponse() throws Exception { + serverThread.execute(MessagingUtil.startReplyServer(context, SERVER_ADDRESS, 1)); + Message request = new Message(); + request.setRecipient(SERVER_ADDRESS); + Message response = handleMessageWithTimeout(request); + Assert.assertNotNull(response); + //should be connected to only 1 remote server + Assert.assertEquals(1, handler.getWorkerCount()); + Assert.assertEquals(response.getRecipient(), SERVER_ADDRESS); + } + + // @Test + public void handle_MultiRemote_ShouldReturnResponses() throws Exception { + ExecutorService threadPool = Executors.newCachedThreadPool(); + final int port = 5555; + String serverAddress = null; + for (int i = 0; i < 5; i++) { + serverAddress = "localhost:" + (port + i); + serverThread.execute(MessagingUtil.startReplyServer(context, serverAddress, 1)); + threadPool.execute(createEmptyMessageTaskAndHandle(handler, serverAddress)); + } + Thread.currentThread().sleep(5000);//wait for all messages to get processed + //should be connected to 5 remote server + Assert.assertEquals(5, handler.getWorkerCount()); + } + + private Runnable createEmptyMessageTaskAndHandle(final ClientRequestHandler handler, final String serverAddress) { + + return new Runnable() { + @Override + public void run() { + Message request = new Message(); + request.setRecipient(serverAddress); + try { + Message response = handleMessageWithTimeout(request); + Assert.assertNotNull(response); + Assert.assertEquals(response.getRecipient(), serverAddress); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } + + private Message handleMessageWithTimeout(final Message request) { + Message response = null; + + FutureTask task = new FutureTask(new Callable() { + + @Override + public Message call() { + try { + return handler.handle(request); + } catch (Exception e) { + _logger.debug("Client handler failed to handle request. Exception is [{}]", e); + } + return null; + } + }); + + serverThread.execute(task); + + try { + response = (Message) task.get(5L, TimeUnit.SECONDS); //wait for max 5 sec for server to respond + } catch (Exception e) {/*ignore and continue*/} + + return response; + } + +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientTest.java deleted file mode 100644 index 2e77537756..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientTest.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.opendaylight.controller.sal.connector.remoterpc; - -import junit.framework.Assert; -import org.junit.Before; -import org.junit.Test; -import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper; -import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; - -import java.util.concurrent.TimeoutException; - -public class ClientTest { - - Client client; - - @Before - public void setup(){ - client = new Client(); - client.getRequestQueue().clear(); - } - - @Test - public void testStop() throws Exception { - - } - - @Test - public void testPool() throws Exception { - - } - - @Test - public void process_AddAMessage_ShouldAddToQueue() throws Exception { - client.process(getEmptyMessageWrapper()); - Assert.assertEquals(1, client.getRequestQueue().size()); - } - - /** - * Queue size is 100. Adding 101 message should time out in 2 sec - * if server does not process it - * @throws Exception - */ - @Test(expected = TimeoutException.class) - public void process_Add101Message_ShouldThrow() throws Exception { - for (int i=0;i<101;i++){ - client.process(getEmptyMessageWrapper()); - } - } - - @Test - public void testStart() throws Exception { - } - - private MessageWrapper getEmptyMessageWrapper(){ - return new MessageWrapper(new Message(), null); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/MockRoutingTable.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/MockRoutingTable.java new file mode 100644 index 0000000000..7b0ac2c6a0 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/MockRoutingTable.java @@ -0,0 +1,60 @@ +package org.opendaylight.controller.sal.connector.remoterpc; + +import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; +import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Mock implementation of routing table + */ +public class MockRoutingTable implements RoutingTable { + + + @Override + public void addRoute(Object o, Object o2) throws RoutingTableException, SystemException { + + } + + @Override + public void addGlobalRoute(Object o, Object o2) throws RoutingTableException, SystemException { + + } + + @Override + public void removeRoute(Object o, Object o2) { + + } + + @Override + public void removeGlobalRoute(Object o) throws RoutingTableException, SystemException { + + } + + @Override + public Set getRoutes(Object o) { + Set routes = new HashSet(); + routes.add("localhost:5554"); + return routes; + } + + @Override + public Set getAllRoutes() { + return Collections.emptySet(); + } + + @Override + public Object getARoute(Object o) { + return null; + } + + @Override + public void registerRouteChangeListener(RouteChangeListener routeChangeListener) { + + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java index f6b9004eae..9b7cf89dfb 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java @@ -1,3 +1,10 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + package org.opendaylight.controller.sal.connector.remoterpc; import java.net.URI; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocketTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocketTest.java deleted file mode 100644 index e23a3ca5ef..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RpcSocketTest.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.sal.connector.remoterpc; - -import junit.framework.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; -import org.opendaylight.controller.sal.connector.remoterpc.dto.MessageWrapper; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.zeromq.ZMQ; - -import java.util.concurrent.TimeoutException; - -import static org.mockito.Mockito.doNothing; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(RpcSocket.class) -public class RpcSocketTest { - RpcSocket rpcSocket = new RpcSocket("tcp://localhost:5554", new ZMQ.Poller(1)); - RpcSocket spy = PowerMockito.spy(rpcSocket); - - @Test - public void testCreateSocket() throws Exception { - Assert.assertEquals("tcp://localhost:5554", spy.getAddress()); - Assert.assertEquals(ZMQ.REQ, spy.getSocket().getType()); - } - - @Test(expected = TimeoutException.class) - public void send_WhenQueueGetsFull_ShouldThrow() throws Exception { - - doNothing().when(spy).process(); - - //10 is queue size - for (int i=0;i<10;i++){ - spy.send(getEmptyMessageWrapper()); - } - - //sending 11th message should throw - spy.send(getEmptyMessageWrapper()); - } - - @Test - public void testHasTimedOut() throws Exception { - spy.send(getEmptyMessageWrapper()); - Assert.assertFalse(spy.hasTimedOut()); - Thread.sleep(1000); - Assert.assertFalse(spy.hasTimedOut()); - Thread.sleep(1000); - Assert.assertTrue(spy.hasTimedOut()); - } - - @Test - public void testProcess() throws Exception { - PowerMockito.doNothing().when(spy, "sendMessage"); - spy.send(getEmptyMessageWrapper()); - - //Next message should get queued - spy.send(getEmptyMessageWrapper()); - - //queue size should be 2 - Assert.assertEquals(2, spy.getQueueSize()); - - - spy.process(); - //sleep for 2 secs (timeout) - //message send would be retried - Thread.sleep(2000); - spy.process(); - Thread.sleep(2000); - spy.process(); - Thread.sleep(2000); - spy.process(); //retry fails, next message will get picked up - Assert.assertEquals(1, spy.getQueueSize()); - } - - @Test - public void testProcessStateTransitions() throws Exception { - PowerMockito.doNothing().when(spy, "sendMessage"); - Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); - spy.send(getEmptyMessageWrapper()); - Assert.assertEquals(1, spy.getQueueSize()); - Thread.sleep(200); - Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); - Thread.sleep(1800); - - //1st timeout, 2nd try - spy.process(); - Thread.sleep(200); - Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); - Thread.sleep(1800); - - //2nd timeout, 3rd try - spy.process(); - Thread.sleep(200); - Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); - Thread.sleep(1800); - - //3rd timeout, no more tries => remove - spy.process(); - Thread.sleep(200); - Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); - Assert.assertEquals(0, spy.getQueueSize()); - } - - @Test - public void testParseMessage() throws Exception { - // Write an integration test for parseMessage - } - - @Test - public void testRecycleSocket() throws Exception { - // This will need to be updated in the future...for now, recycleSocket() calls close() - Assert.assertTrue(spy.getSocket().base().check_tag()); - spy.close(); - Assert.assertEquals(10, spy.getSocket().getLinger()); - Assert.assertFalse(spy.getSocket().base().check_tag()); - } - - @Test - public void testClose() throws Exception { - Assert.assertTrue(spy.getSocket().base().check_tag()); - spy.close(); - Assert.assertEquals(10, spy.getSocket().getLinger()); - Assert.assertFalse(spy.getSocket().base().check_tag()); - } - - @Test - public void testReceive() throws Exception { - PowerMockito.doReturn(null).when(spy, "parseMessage"); - PowerMockito.doNothing().when(spy, "process"); - spy.send(getEmptyMessageWrapper()); - - //There should be 1 message waiting in the queue - Assert.assertEquals(1, spy.getQueueSize()); - - spy.receive(); - //This should complete message processing - //The message should be removed from the queue - Assert.assertEquals(0, spy.getQueueSize()); - Assert.assertEquals(RpcSocket.NUM_RETRIES, spy.getRetriesLeft()); - - } - - @Test - public void testReceiveStateTransitions() throws Exception { - PowerMockito.doReturn(null).when(spy, "parseMessage"); - Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); - spy.send(getEmptyMessageWrapper()); - - //There should be 1 message waiting in the queue - Assert.assertEquals(1, spy.getQueueSize()); - Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); - - spy.receive(); - //This should complete message processing - //The message should be removed from the queue - Assert.assertEquals(0, spy.getQueueSize()); - Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); - } - - private MessageWrapper getEmptyMessageWrapper(){ - return new MessageWrapper(new Message(), null); - } - - @Test - public void testProcessReceiveSequence() throws Exception { - PowerMockito.doNothing().when(spy, "sendMessage"); - PowerMockito.doReturn(null).when(spy, "parseMessage"); - Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); - spy.send(getEmptyMessageWrapper()); - spy.send(getEmptyMessageWrapper()); - Assert.assertEquals(2, spy.getQueueSize()); - Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); - - - Thread.sleep(2000); - Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); - spy.receive(); - Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); - Assert.assertEquals(1, spy.getQueueSize()); - - spy.process(); - Assert.assertTrue(spy.getState() instanceof RpcSocket.BusySocketState); - spy.receive(); - Assert.assertTrue(spy.getState() instanceof RpcSocket.IdleSocketState); - Assert.assertEquals(0, spy.getQueueSize()); - } -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImplTest.java new file mode 100644 index 0000000000..1b282f6ab5 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImplTest.java @@ -0,0 +1,194 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + + +import com.google.common.base.Optional; +import junit.framework.Assert; +import org.junit.*; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sal.core.api.RpcRegistrationListener; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.zeromq.ZMQ; +import zmq.Ctx; +import zmq.SocketBase; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ServerImplTest { + + private static ZMQ.Context context; + private ServerImpl server; + private Broker.ProviderSession brokerSession; + private RoutingTableProvider routingTableProvider; + private RpcRegistrationListener listener; + + ExecutorService pool; + + //Server configuration + private final int HANDLER_COUNT = 2; + private final int HWM = 200; + private final int port = 5554; + //server address + private final String SERVER_ADDRESS = "tcp://localhost:5554"; + + //@BeforeClass + public static void init() { + context = ZMQ.context(1); + } + + //@AfterClass + public static void destroy() { + MessagingUtil.closeZmqContext(context); + } + + @Before + public void setup() throws InterruptedException { + context = ZMQ.context(1); + brokerSession = mock(Broker.ProviderSession.class); + routingTableProvider = mock(RoutingTableProvider.class); + listener = mock(RpcRegistrationListener.class); + + server = new ServerImpl(port); + server.setBrokerSession(brokerSession); + server.setRoutingTableProvider(routingTableProvider); + + RoutingTable mockRoutingTable = new MockRoutingTable(); + Optional> optionalRoutingTable = Optional.fromNullable(mockRoutingTable); + when(routingTableProvider.getRoutingTable()).thenReturn(optionalRoutingTable); + + when(brokerSession.addRpcRegistrationListener(listener)).thenReturn(null); + when(brokerSession.getSupportedRpcs()).thenReturn(Collections.EMPTY_SET); + when(brokerSession.rpc(null, mock(CompositeNode.class))).thenReturn(null); + server.start(); + Thread.sleep(5000);//wait for server to start + } + + @After + public void tearDown() throws InterruptedException { + + if (pool != null) + pool.shutdown(); + + if (server != null) + server.stop(); + + MessagingUtil.closeZmqContext(context); + + Thread.sleep(5000);//wait for server to stop + Assert.assertEquals(ServerImpl.State.STOPPED, server.getStatus()); + } + + @Test + public void getRoutingTableProvider_Call_ShouldReturnRoutingTable() throws Exception { + Assert.assertNotNull(server.getRoutingTableProvider()); + } + + @Test + public void getBrokerSession_Call_ShouldReturnBrokerSession() throws Exception { + Optional mayBeBroker = server.getBrokerSession(); + + if (mayBeBroker.isPresent()) + Assert.assertEquals(brokerSession, mayBeBroker.get()); + else + Assert.fail("Broker does not exist in Remote RPC Server"); + + } + + @Test + public void start_Call_ShouldSetServerStatusToStarted() throws Exception { + Assert.assertEquals(ServerImpl.State.STARTED, server.getStatus()); + + } + + @Test + public void start_Call_ShouldCreateNZmqSockets() throws Exception { + final int EXPECTED_COUNT = 2 + HANDLER_COUNT; //1 ROUTER + 1 DEALER + HANDLER_COUNT + + Optional mayBeContext = server.getZmqContext(); + if (mayBeContext.isPresent()) + Assert.assertEquals(EXPECTED_COUNT, findSocketCount(mayBeContext.get())); + else + Assert.fail("ZMQ Context does not exist in Remote RPC Server"); + } + + @Test + public void start_Call_ShouldCreate1ServerThread() { + final String SERVER_THREAD_NAME = "remote-rpc-server"; + final int EXPECTED_COUNT = 1; + List serverThreads = findThreadsWithName(SERVER_THREAD_NAME); + Assert.assertEquals(EXPECTED_COUNT, serverThreads.size()); + } + + @Test + public void start_Call_ShouldCreateNHandlerThreads() { + //final String WORKER_THREAD_NAME = "remote-rpc-worker"; + final int EXPECTED_COUNT = HANDLER_COUNT; + + Optional serverRequestHandlerOptional = server.getHandler(); + if (serverRequestHandlerOptional.isPresent()){ + ServerRequestHandler handler = serverRequestHandlerOptional.get(); + ThreadPoolExecutor workerPool = handler.getWorkerPool(); + Assert.assertEquals(EXPECTED_COUNT, workerPool.getPoolSize()); + } else { + Assert.fail("Server is in illegal state. ServerHandler does not exist"); + } + + } + + @Test + public void testStop() throws Exception { + + } + + @Test + public void testOnRouteUpdated() throws Exception { + + } + + @Test + public void testOnRouteDeleted() throws Exception { + + } + + private int findSocketCount(ZMQ.Context context) + throws NoSuchFieldException, IllegalAccessException { + Field ctxField = context.getClass().getDeclaredField("ctx"); + ctxField.setAccessible(true); + Ctx ctx = Ctx.class.cast(ctxField.get(context)); + + Field socketListField = ctx.getClass().getDeclaredField("sockets"); + socketListField.setAccessible(true); + List sockets = List.class.cast(socketListField.get(ctx)); + + return sockets.size(); + } + + private List findThreadsWithName(String name) { + Thread[] threads = new Thread[Thread.activeCount()]; + Thread.enumerate(threads); + + List foundThreads = new ArrayList(); + for (Thread t : threads) { + if (t.getName().startsWith(name)) + foundThreads.add(t); + } + + return foundThreads; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandlerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandlerTest.java new file mode 100644 index 0000000000..6e39867e53 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerRequestHandlerTest.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc; + +import junit.framework.Assert; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil; +import org.opendaylight.controller.sal.core.api.Broker; +import org.zeromq.ZMQ; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.mockito.Mockito.mock; + +public class ServerRequestHandlerTest { + + ServerRequestHandler handler; + ZMQ.Context context; + ExecutorService executorService = Executors.newCachedThreadPool(); + private int workerCount = 2; + private String mockDealerAddress = "inproc://rpc-request-handler"; + private String mockServerIp = "localhost"; + private int mockServerPort = 5554; + + @Before + public void setUp() throws Exception { + context = ZMQ.context(1); + String mockServerAddress = mockServerIp + ":" + mockServerPort; + Broker.ProviderSession mockSession = mock(Broker.ProviderSession.class); + handler = new ServerRequestHandler(context, mockSession, workerCount, mockDealerAddress, mockServerAddress); + handler.start(); + } + + @After + public void tearDown() throws Exception { + executorService.shutdown(); + MessagingUtil.closeZmqContext(context); + handler.close(); + } + + @Test + public void testStart() throws Exception { + //should start workers == workerCount + Assert.assertEquals(workerCount, handler.getWorkerPool().getPoolSize()); + + //killing a thread should recreate another one + + //start router-dealer bridge + executorService.execute(MessagingUtil.createRouterDealerBridge(context, mockDealerAddress, mockServerPort)); + Thread.sleep(1000); //give sometime for socket initialization + + //this will kill the thread + final String WORKER_THREAD_NAME = "remote-rpc-worker"; + interruptAThreadWithName(WORKER_THREAD_NAME); + + //send 4 message to router + for (int i = 0; i < 4; i++) + executorService.execute(MessagingUtil.sendAnEmptyMessage(context, "tcp://" + mockServerIp + ":" + mockServerPort)); + + //worker pool size should not change. + Assert.assertEquals(workerCount, handler.getWorkerPool().getPoolSize()); + + Thread.sleep(10000); //wait for processing to complete + } + + @Test + public void testClose() throws Exception { + + } + + /** + * Interrupts the first thread found whose name starts with the provided name + * + * @param name + */ + private void interruptAThreadWithName(String name) { + List workerThreads = findThreadsWithName(name); + if (workerThreads.size() > 0) workerThreads.get(0).interrupt(); + } + + /** + * Find all threads that start with the given name + * + * @param name + * @return + */ + private List findThreadsWithName(String name) { + Thread[] threads = new Thread[Thread.activeCount()]; + Thread.enumerate(threads); + + List foundThreads = new ArrayList(); + for (Thread t : threads) { + if (t.getName().startsWith(name)) + foundThreads.add(t); + } + + return foundThreads; + } +} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManagerTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManagerTest.java deleted file mode 100644 index 130b30d5f5..0000000000 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/SocketManagerTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.sal.connector.remoterpc; - -import com.google.common.base.Optional; -import junit.framework.Assert; -import org.junit.After; -import org.junit.Before; -import org.zeromq.ZMQ; -import org.opendaylight.controller.sal.connector.remoterpc.SocketManager; -import org.opendaylight.controller.sal.connector.remoterpc.RpcSocket; -import org.opendaylight.controller.sal.connector.remoterpc.Context; -import org.junit.Test; - -public class SocketManagerTest { - - SocketManager manager; - - @Before - public void setup(){ - manager = new SocketManager(); - } - - @After - public void tearDown() throws Exception { - manager.close(); - } - - @Test - public void getManagedSockets_When2NewAdded_ShouldContain2() throws Exception { - - //Prepare data - manager.getManagedSocket("tcp://localhost:5554"); - manager.getManagedSocket("tcp://localhost:5555"); - - Assert.assertTrue( 2 == manager.getManagedSockets().size()); - } - - @Test - public void getManagedSockets_When2NewAddedAnd1Existing_ShouldContain2() throws Exception { - - //Prepare data - manager.getManagedSocket("tcp://localhost:5554"); - manager.getManagedSocket("tcp://localhost:5555"); - manager.getManagedSocket("tcp://localhost:5554"); //ask for the first one - - Assert.assertTrue( 2 == manager.getManagedSockets().size()); - } - - @Test - public void getManagedSocket_WhenPassedAValidAddress_ShouldReturnARpcSocket() throws Exception { - String testAddress = "tcp://localhost:5554"; - RpcSocket rpcSocket = manager.getManagedSocket(testAddress); - Assert.assertEquals(testAddress, rpcSocket.getAddress()); - } - - @Test(expected = IllegalArgumentException.class) - public void getManagedSocket_WhenPassedInvalidHostAddress_ShouldThrow() throws Exception { - String testAddress = "tcp://nonexistenthost:5554"; - RpcSocket rpcSocket = manager.getManagedSocket(testAddress); - } - - @Test(expected = IllegalArgumentException.class) - public void getManagedSocket_WhenPassedInvalidAddress_ShouldThrow() throws Exception { - String testAddress = "xxx"; - RpcSocket rpcSocket = manager.getManagedSocket(testAddress); - } - - @Test - public void getManagedSocket_WhenPassedAValidZmqSocket_ShouldReturnARpcSocket() throws Exception { - //Prepare data - String firstAddress = "tcp://localhost:5554"; - RpcSocket firstRpcSocket = manager.getManagedSocket(firstAddress); - ZMQ.Socket firstZmqSocket = firstRpcSocket.getSocket(); - - String secondAddress = "tcp://localhost:5555"; - RpcSocket secondRpcSocket = manager.getManagedSocket(secondAddress); - ZMQ.Socket secondZmqSocket = secondRpcSocket.getSocket(); - - Assert.assertEquals(firstRpcSocket, manager.getManagedSocketFor(firstZmqSocket).get()); - Assert.assertEquals(secondRpcSocket, manager.getManagedSocketFor(secondZmqSocket).get()); - } - - @Test - public void getManagedSocket_WhenPassedNonManagedZmqSocket_ShouldReturnNone() throws Exception { - ZMQ.Socket nonManagedSocket = Context.getInstance().getZmqContext().socket(ZMQ.REQ); - nonManagedSocket.connect("tcp://localhost:5000"); - - //Prepare data - String firstAddress = "tcp://localhost:5554"; - RpcSocket firstRpcSocket = manager.getManagedSocket(firstAddress); - ZMQ.Socket firstZmqSocket = firstRpcSocket.getSocket(); - - Assert.assertSame(Optional.absent(), manager.getManagedSocketFor(nonManagedSocket) ); - Assert.assertSame(Optional.absent(), manager.getManagedSocketFor(null) ); - } - - @Test - public void stop_WhenCalled_ShouldEmptyManagedSockets() throws Exception { - manager.getManagedSocket("tcp://localhost:5554"); - manager.getManagedSocket("tcp://localhost:5555"); - Assert.assertTrue( 2 == manager.getManagedSockets().size()); - - manager.close(); - Assert.assertTrue( 0 == manager.getManagedSockets().size()); - } - - @Test - public void poller_WhenCalled_ShouldReturnAnInstanceOfPoller() throws Exception { - Assert.assertTrue (manager.getPoller() instanceof ZMQ.Poller); - } - -} diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/MessagingUtil.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/MessagingUtil.java new file mode 100644 index 0000000000..20cf4f6362 --- /dev/null +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/utils/MessagingUtil.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.sal.connector.remoterpc.utils; + +import junit.framework.Assert; +import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.zeromq.ZMQ; + +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + +public class MessagingUtil { + + private static final Logger _logger = LoggerFactory.getLogger(MessagingUtil.class); + + public static Runnable startReplyServer(final ZMQ.Context context, + final String serverAddress, + final int numRequests /*number of requests after which server shuts down*/) { + return new Runnable() { + + @Override + public void run() { + final ZMQ.Socket socket = context.socket(ZMQ.REP); + try { + int returnCode = socket.bind("tcp://" + serverAddress); + Assert.assertNotSame(-1, returnCode); + _logger.info(" Starting reply server[{}] for test...", serverAddress); + + //for (int i=0;i + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java index 6c294dddcc..2bc151a0f3 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/provider-service/src/main/java/org/opendaylight/controller/sample/zeromq/provider/ExampleProvider.java @@ -2,7 +2,6 @@ package org.opendaylight.controller.sample.zeromq.provider; import org.opendaylight.controller.sal.common.util.RpcErrors; import org.opendaylight.controller.sal.common.util.Rpcs; -import org.opendaylight.controller.sal.connector.remoterpc.dto.CompositeNodeImpl; import org.opendaylight.controller.sal.core.api.AbstractProvider; import org.opendaylight.controller.sal.core.api.Broker.ProviderSession; import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration; diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/RouterTest.java b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/RouterTest.java index dd910ea34b..5ee982009e 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/RouterTest.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-it/src/test/java/org/opendaylight/controller/sample/zeromq/test/it/RouterTest.java @@ -13,14 +13,14 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; -import org.opendaylight.controller.sal.connector.remoterpc.Client; + import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcClient; -import org.opendaylight.controller.sal.connector.remoterpc.RemoteRpcServer; -import org.opendaylight.controller.sal.connector.remoterpc.ServerImpl; + import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; import org.opendaylight.controller.sal.core.api.Broker; -import org.opendaylight.controller.sample.zeromq.provider.ExampleProvider; import org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer; +import org.opendaylight.controller.sample.zeromq.provider.ExampleProvider; + import org.opendaylight.controller.test.sal.binding.it.TestHelper; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; @@ -33,7 +33,7 @@ import org.ops4j.pax.exam.util.Filter; import org.osgi.framework.Bundle; import org.osgi.framework.BundleException; import org.osgi.framework.ServiceReference; -import org.osgi.framework.ServiceRegistration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; @@ -46,8 +46,7 @@ import java.util.Hashtable; import static org.opendaylight.controller.test.sal.binding.it.TestHelper.baseModelBundles; import static org.opendaylight.controller.test.sal.binding.it.TestHelper.bindingAwareSalBundles; -import static org.opendaylight.controller.test.sal.binding.it.TestHelper.configMinumumBundles; -import static org.opendaylight.controller.test.sal.binding.it.TestHelper.mdSalCoreBundles; + import static org.ops4j.pax.exam.CoreOptions.*; @RunWith(PaxExam.class) @@ -91,8 +90,8 @@ public class RouterTest { _logger.debug("Provider sends announcement [{}]", "heartbeat"); provider.announce(QNAME); - ServiceReference routerRef = ctx.getServiceReference(Client.class); - Client router = (Client) ctx.getService(routerRef); + ServiceReference routerRef = ctx.getServiceReference(RemoteRpcClient.class); + RemoteRpcClient router = (RemoteRpcClient) ctx.getService(routerRef); _logger.debug("Found router[{}]", router); _logger.debug("Invoking RPC [{}]", QNAME); for (int i = 0; i < 3; i++) { @@ -353,6 +352,7 @@ public class RouterTest { } } } + private String stateToString(int state) { switch (state) { case Bundle.ACTIVE: @@ -371,86 +371,84 @@ public class RouterTest { @Configuration public Option[] config() { return options(systemProperty("osgi.console").value("2401"), - systemProperty("rpc.port").value("5555"), - mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), // - mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), // - mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), // - mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), // - - //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), // - mavenBundle(ODL, "sal-common").versionAsInProject(), // - mavenBundle(ODL, "sal-common-api").versionAsInProject(),// - mavenBundle(ODL, "sal-common-impl").versionAsInProject(), // - mavenBundle(ODL, "sal-common-util").versionAsInProject(), // - mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), // - mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), // - mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), // - mavenBundle(ODL, "sal-connector-api").versionAsInProject(), // - - - - baseModelBundles(), - bindingAwareSalBundles(), - TestHelper.bindingIndependentSalBundles(), - TestHelper.configMinumumBundles(), - TestHelper.mdSalCoreBundles(), - - //Added the consumer - mavenBundle(SAMPLE, "sal-remoterpc-connector-test-consumer").versionAsInProject(), // - //**** These two bundles below are NOT successfully resolved -- some of their dependencies must be missing - //**** This causes the "Message" error to occur, the class cannot be found - mavenBundle(SAMPLE, "sal-remoterpc-connector-test-provider").versionAsInProject(), // - mavenBundle(ODL, "sal-remoterpc-connector").versionAsInProject(), // - - mavenBundle(ODL, "zeromq-routingtable.implementation").versionAsInProject(), - mavenBundle(YANG, "concepts").versionAsInProject(), - mavenBundle(YANG, "yang-binding").versionAsInProject(), // - mavenBundle(YANG, "yang-common").versionAsInProject(), // - mavenBundle(YANG, "yang-data-api").versionAsInProject(), // - mavenBundle(YANG, "yang-data-impl").versionAsInProject(), // - mavenBundle(YANG, "yang-model-api").versionAsInProject(), // - mavenBundle(YANG, "yang-parser-api").versionAsInProject(), // - mavenBundle(YANG, "yang-parser-impl").versionAsInProject(), // - mavenBundle(YANG, "yang-model-util").versionAsInProject(), // - mavenBundle(YANG + ".thirdparty", "xtend-lib-osgi").versionAsInProject(), // - mavenBundle(YANG + ".thirdparty", "antlr4-runtime-osgi-nohead").versionAsInProject(), // - mavenBundle("com.google.guava", "guava").versionAsInProject(), // - mavenBundle("org.zeromq", "jeromq").versionAsInProject(), - mavenBundle("com.fasterxml.jackson.core", "jackson-annotations").versionAsInProject(), - mavenBundle("com.fasterxml.jackson.core", "jackson-core").versionAsInProject(), - mavenBundle("com.fasterxml.jackson.core", "jackson-databind").versionAsInProject(), - //routingtable dependencies - systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"), - // List framework bundles - mavenBundle("equinoxSDK381", "org.eclipse.equinox.console").versionAsInProject(), - mavenBundle("equinoxSDK381", "org.eclipse.equinox.util").versionAsInProject(), - mavenBundle("equinoxSDK381", "org.eclipse.osgi.services").versionAsInProject(), - mavenBundle("equinoxSDK381", "org.eclipse.equinox.ds").versionAsInProject(), - mavenBundle("equinoxSDK381", "org.apache.felix.gogo.command").versionAsInProject(), - mavenBundle("equinoxSDK381", "org.apache.felix.gogo.runtime").versionAsInProject(), - mavenBundle("equinoxSDK381", "org.apache.felix.gogo.shell").versionAsInProject(), - // List logger bundles - - mavenBundle("org.opendaylight.controller", "clustering.services") - .versionAsInProject(), - mavenBundle("org.opendaylight.controller", "clustering.stub") - .versionAsInProject(), - - - // List all the bundles on which the test case depends - mavenBundle("org.opendaylight.controller", "sal") - .versionAsInProject(), - mavenBundle("org.opendaylight.controller", "sal.implementation") - .versionAsInProject(), - mavenBundle("org.jboss.spec.javax.transaction", - "jboss-transaction-api_1.1_spec").versionAsInProject(), - mavenBundle("org.apache.commons", "commons-lang3") - .versionAsInProject(), - mavenBundle("org.apache.felix", - "org.apache.felix.dependencymanager") - .versionAsInProject(), - - junitBundles() + systemProperty("rpc.port").value("5555"), + mavenBundle("org.slf4j", "slf4j-api").versionAsInProject(), // + mavenBundle("org.slf4j", "log4j-over-slf4j").versionAsInProject(), // + mavenBundle("ch.qos.logback", "logback-core").versionAsInProject(), // + mavenBundle("ch.qos.logback", "logback-classic").versionAsInProject(), // + + //mavenBundle(ODL, "sal-binding-broker-impl").versionAsInProject().update(), // + mavenBundle(ODL, "sal-common").versionAsInProject(), // + mavenBundle(ODL, "sal-common-api").versionAsInProject(),// + mavenBundle(ODL, "sal-common-impl").versionAsInProject(), // + mavenBundle(ODL, "sal-common-util").versionAsInProject(), // + mavenBundle(ODL, "sal-core-api").versionAsInProject().update(), // + mavenBundle(ODL, "sal-broker-impl").versionAsInProject(), // + mavenBundle(ODL, "sal-core-spi").versionAsInProject().update(), // + mavenBundle(ODL, "sal-connector-api").versionAsInProject(), // + + + baseModelBundles(), + bindingAwareSalBundles(), + TestHelper.bindingIndependentSalBundles(), + TestHelper.configMinumumBundles(), + TestHelper.mdSalCoreBundles(), + + //Added the consumer + mavenBundle(SAMPLE, "sal-remoterpc-connector-test-consumer").versionAsInProject(), // + //**** These two bundles below are NOT successfully resolved -- some of their dependencies must be missing + //**** This causes the "Message" error to occur, the class cannot be found + mavenBundle(SAMPLE, "sal-remoterpc-connector-test-provider").versionAsInProject(), // + mavenBundle(ODL, "sal-remoterpc-connector").versionAsInProject(), // + + mavenBundle(ODL, "zeromq-routingtable.implementation").versionAsInProject(), + mavenBundle(YANG, "concepts").versionAsInProject(), + mavenBundle(YANG, "yang-binding").versionAsInProject(), // + mavenBundle(YANG, "yang-common").versionAsInProject(), // + mavenBundle(YANG, "yang-data-api").versionAsInProject(), // + mavenBundle(YANG, "yang-data-impl").versionAsInProject(), // + mavenBundle(YANG, "yang-model-api").versionAsInProject(), // + mavenBundle(YANG, "yang-parser-api").versionAsInProject(), // + mavenBundle(YANG, "yang-parser-impl").versionAsInProject(), // + mavenBundle(YANG, "yang-model-util").versionAsInProject(), // + mavenBundle(YANG + ".thirdparty", "xtend-lib-osgi").versionAsInProject(), // + mavenBundle(YANG + ".thirdparty", "antlr4-runtime-osgi-nohead").versionAsInProject(), // + mavenBundle("com.google.guava", "guava").versionAsInProject(), // + mavenBundle("org.zeromq", "jeromq").versionAsInProject(), + mavenBundle("org.codehaus.jackson", "jackson-mapper-asl").versionAsInProject(), + mavenBundle("org.codehaus.jackson", "jackson-core-asl").versionAsInProject(), + //routingtable dependencies + systemPackages("sun.reflect", "sun.reflect.misc", "sun.misc"), + // List framework bundles + mavenBundle("equinoxSDK381", "org.eclipse.equinox.console").versionAsInProject(), + mavenBundle("equinoxSDK381", "org.eclipse.equinox.util").versionAsInProject(), + mavenBundle("equinoxSDK381", "org.eclipse.osgi.services").versionAsInProject(), + mavenBundle("equinoxSDK381", "org.eclipse.equinox.ds").versionAsInProject(), + mavenBundle("equinoxSDK381", "org.apache.felix.gogo.command").versionAsInProject(), + mavenBundle("equinoxSDK381", "org.apache.felix.gogo.runtime").versionAsInProject(), + mavenBundle("equinoxSDK381", "org.apache.felix.gogo.shell").versionAsInProject(), + // List logger bundles + + mavenBundle("org.opendaylight.controller", "clustering.services") + .versionAsInProject(), + mavenBundle("org.opendaylight.controller", "clustering.stub") + .versionAsInProject(), + + + // List all the bundles on which the test case depends + mavenBundle("org.opendaylight.controller", "sal") + .versionAsInProject(), + mavenBundle("org.opendaylight.controller", "sal.implementation") + .versionAsInProject(), + mavenBundle("org.jboss.spec.javax.transaction", + "jboss-transaction-api_1.1_spec").versionAsInProject(), + mavenBundle("org.apache.commons", "commons-lang3") + .versionAsInProject(), + mavenBundle("org.apache.felix", + "org.apache.felix.dependencymanager") + .versionAsInProject(), + + junitBundles() ); } diff --git a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/java/org/opendaylight/controller/tests/zmqrouter/rest/Router.java b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/java/org/opendaylight/controller/tests/zmqrouter/rest/Router.java index 6c9ec4e788..b4fa698a92 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/java/org/opendaylight/controller/tests/zmqrouter/rest/Router.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/src/main/java/org/opendaylight/controller/tests/zmqrouter/rest/Router.java @@ -4,7 +4,6 @@ import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; -import org.opendaylight.controller.sal.connector.remoterpc.dto.CompositeNodeImpl; import org.opendaylight.controller.sal.connector.remoterpc.impl.RoutingTableImpl; import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; import org.opendaylight.controller.sample.zeromq.consumer.ExampleConsumer; @@ -70,7 +69,7 @@ public class Router { _logger.info("Invoking RPC"); ExampleConsumer consumer = getConsumer(); - RpcResult result = consumer.invokeRpc(QNAME, new CompositeNodeImpl()); + RpcResult result = consumer.invokeRpc(QNAME, consumer.getValidCompositeNodeWithOneSimpleChild()); _logger.info("Result [{}]", result.isSuccessful()); return stringify(result); @@ -111,7 +110,7 @@ public class Router { _logger.debug("Could not get routing table impl reference"); return "Could not get routingtable referen "; } - RoutingTable routingTable = (RoutingTableImpl) ctx.getService(routingTableServiceReference); + RoutingTableImpl routingTable = (RoutingTableImpl) ctx.getService(routingTableServiceReference); if (routingTable == null) { _logger.info("Could not get routing table service"); return "Could not get routing table service"; @@ -128,16 +127,14 @@ public class Router { _logger.error("error in adding routing identifier" + e.getMessage()); } - Set routes = routingTable.getRoutes(rii.toString()); + String result = routingTable.dumpRoutingTableCache(); - StringBuilder stringBuilder = new StringBuilder(); - for (String route : routes) { - stringBuilder.append(route); - } - _logger.info("Result [{}] routes added for route" + rii + stringBuilder.toString()); - return stringBuilder.toString(); + + _logger.info("Result [{}] routes added for route" + rii + result); + + return result; } @GET @@ -242,5 +239,27 @@ public class Router { public org.opendaylight.yangtools.yang.data.api.InstanceIdentifier getRoute() { return InstanceIdentifier.of(instance); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RoutingIdentifierImpl that = (RoutingIdentifierImpl) o; + + if (!QNAME.equals(that.QNAME)) return false; + if (!instance.equals(that.instance)) return false; + if (!namespace.equals(that.namespace)) return false; + + return true; + } + + @Override + public int hashCode() { + int result = namespace.hashCode(); + result = 31 * result + QNAME.hashCode(); + result = 31 * result + instance.hashCode(); + return result; + } } } diff --git a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java index 62ba04687e..f3188e1d0e 100644 --- a/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java +++ b/opendaylight/md-sal/statistics-manager/src/main/java/org/opendaylight/controller/md/statistics/manager/StatisticsUpdateCommiter.java @@ -7,6 +7,10 @@ */ package org.opendaylight.controller.md.statistics.manager; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.ConcurrentMap; + import org.opendaylight.controller.sal.binding.api.data.DataModificationTransaction; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; @@ -84,10 +88,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111. import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.nodes.node.meter.MeterStatisticsBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.config.stats.reply.MeterConfigStats; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.types.rev130918.meter.statistics.reply.MeterStats; -import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.EthernetMatch; -import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.VlanMatch; -import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.layer._3.match.Ipv4Match; -import org.opendaylight.yang.gen.v1.urn.opendaylight.model.match.types.rev131026.match.layer._4.match.TcpMatch; import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericQueueStatistics; import org.opendaylight.yang.gen.v1.urn.opendaylight.model.statistics.types.rev130925.GenericStatistics; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData; @@ -107,10 +107,6 @@ import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdenti import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.ConcurrentMap; - /** * Class implement statistics manager related listener interface and augment all the * received statistics data to data stores. @@ -810,6 +806,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList * @param storedFlow * @return */ + public boolean matchEquals(Match statsFlow, Match storedFlow) { if (statsFlow == storedFlow) { return true; @@ -819,12 +816,9 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList } if (storedFlow.getEthernetMatch() == null) { if (statsFlow.getEthernetMatch() != null) { - if(!statsFlow.getEthernetMatch().getEthernetDestination().getAddress().getValue().equals("00:00:00:00:00:00") || - !statsFlow.getEthernetMatch().getEthernetSource().getAddress().getValue().equals("00:00:00:00:00:00")){ - return false; - } + return false; } - } else if(!EthernetMatchEquals(statsFlow.getEthernetMatch(),storedFlow.getEthernetMatch())) { + } else if(!storedFlow.getEthernetMatch().equals(statsFlow.getEthernetMatch())) { return false; } if (storedFlow.getIcmpv4Match()== null) { @@ -850,12 +844,7 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList } if (storedFlow.getInPort()== null) { if (statsFlow.getInPort() != null) { - String[] portArr = statsFlow.getInPort().getValue().split(":"); - if(portArr.length >= 3){ - if(Integer.parseInt(portArr[2]) != 0){ - return false; - } - } + return false; } } else if(!storedFlow.getInPort().equals(statsFlow.getInPort())) { return false; @@ -869,22 +858,14 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList } if (storedFlow.getLayer3Match()== null) { if (statsFlow.getLayer3Match() != null) { - Ipv4Match ipv4Match = (Ipv4Match)statsFlow.getLayer3Match(); - if(!ipv4Match.getIpv4Source().getValue().equals("0.0.0.0/0") || - !ipv4Match.getIpv4Destination().getValue().equals("0.0.0.0/0")){ return false; - } } } else if(!storedFlow.getLayer3Match().equals(statsFlow.getLayer3Match())) { return false; } if (storedFlow.getLayer4Match()== null) { if (statsFlow.getLayer4Match() != null) { - TcpMatch tcpMatch = (TcpMatch)statsFlow.getLayer4Match(); - if(!tcpMatch.getTcpDestinationPort().getValue().equals(0) || - !tcpMatch.getTcpSourcePort().getValue().equals(0)){ - return false; - } + return false; } } else if(!storedFlow.getLayer4Match().equals(statsFlow.getLayer4Match())) { return false; @@ -912,49 +893,11 @@ public class StatisticsUpdateCommiter implements OpendaylightGroupStatisticsList } if (storedFlow.getVlanMatch()== null) { if (statsFlow.getVlanMatch() != null) { - VlanMatch vlanMatch = statsFlow.getVlanMatch(); - if(!vlanMatch.getVlanId().getVlanId().getValue().equals(0) || - !vlanMatch.getVlanPcp().getValue().equals((short)0)){ - return false; - } + return false; } } else if(!storedFlow.getVlanMatch().equals(statsFlow.getVlanMatch())) { return false; } return true; } - - public boolean EthernetMatchEquals(EthernetMatch statsEtherMatch, EthernetMatch storedEtherMatch) { - if (statsEtherMatch == storedEtherMatch) { - return true; - } - if (storedEtherMatch.getEthernetDestination()== null) { - if (statsEtherMatch.getEthernetDestination() != null) { - if(!statsEtherMatch.getEthernetDestination().getAddress().getValue().equals("00:00:00:00:00:00")){ - return false; - } - } - } else if(!storedEtherMatch.getEthernetDestination().equals(statsEtherMatch.getEthernetDestination())) { - return false; - } - if (storedEtherMatch.getEthernetSource() == null) { - if (statsEtherMatch.getEthernetSource() != null) { - if(!statsEtherMatch.getEthernetSource().getAddress().getValue().equals("00:00:00:00:00:00")){ - return false; - } - } - } else if(!storedEtherMatch.getEthernetSource().equals(statsEtherMatch.getEthernetSource())) { - return false; - } - if (storedEtherMatch.getEthernetType() == null) { - if (statsEtherMatch.getEthernetType() != null) { - if(!statsEtherMatch.getEthernetType().getType().getValue().equals(0)){ - return false; - } - } - } else if(!storedEtherMatch.getEthernetType().equals(statsEtherMatch.getEthernetType())) { - return false; - } - return true; - } } diff --git a/opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronSubnet_IPAllocationPool.java b/opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronSubnet_IPAllocationPool.java index 15ff548a53..dab69917d1 100644 --- a/opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronSubnet_IPAllocationPool.java +++ b/opendaylight/networkconfiguration/neutron/src/main/java/org/opendaylight/controller/networkconfig/neutron/NeutronSubnet_IPAllocationPool.java @@ -77,15 +77,18 @@ public class NeutronSubnet_IPAllocationPool implements Serializable { * * @param inputString * IPv4 address in dotted decimal format - * @returns high-endian representation of the IPv4 address as a long + * @returns high-endian representation of the IPv4 address as a long. + * This method will return 0 if the input is null. */ static long convert(String inputString) { long ans = 0; - String[] parts = inputString.split("\\."); - for (String part: parts) { - ans <<= 8; - ans |= Integer.parseInt(part); + if (inputString != null) { + String[] parts = inputString.split("\\."); + for (String part: parts) { + ans <<= 8; + ans |= Integer.parseInt(part); + } } return ans; } diff --git a/opendaylight/northbound/connectionmanager/src/main/java/org/opendaylight/controller/connectionmanager/northbound/ConnectionManagerNorthbound.java b/opendaylight/northbound/connectionmanager/src/main/java/org/opendaylight/controller/connectionmanager/northbound/ConnectionManagerNorthbound.java index c069f82a84..e2c1b32c4b 100644 --- a/opendaylight/northbound/connectionmanager/src/main/java/org/opendaylight/controller/connectionmanager/northbound/ConnectionManagerNorthbound.java +++ b/opendaylight/northbound/connectionmanager/src/main/java/org/opendaylight/controller/connectionmanager/northbound/ConnectionManagerNorthbound.java @@ -334,7 +334,7 @@ public class ConnectionManagerNorthbound { } try { - Node node = new Node(nodeType, nodeId); + Node node = Node.fromString(nodeType, nodeId); Status status = connectionManager.disconnect(node); if (status.isSuccess()) { return Response.ok().build(); diff --git a/third-party/ganymed/pom.xml b/third-party/ganymed/pom.xml index 266b5a560a..eae97aaeb3 100644 --- a/third-party/ganymed/pom.xml +++ b/third-party/ganymed/pom.xml @@ -24,7 +24,7 @@ org.osgi org.osgi.core - 4.1.0 + 5.0.0 ch.ethz.ganymed