X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnector%2Fremoterpc%2FClientImpl.java;h=30e11c0806731c24ab151dd7cf585408916fd233;hp=291fe0b8e73a5a52e65f1d6e9be4fcca31290a50;hb=1f2754487ab1e3a37c830909806f90cd54180c7b;hpb=4467f6e1ac869d390b607db90cd7e540118a4c6e diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java index 291fe0b8e7..30e11c0806 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java @@ -12,6 +12,8 @@ import org.opendaylight.controller.sal.common.util.RpcErrors; import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; +import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; @@ -20,14 +22,13 @@ import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; @@ -66,12 +67,6 @@ public class ClientImpl implements RemoteRpcClient { this.routingTableProvider = routingTableProvider; } - @Override - public Set getSupportedRpcs(){ - //TODO: Find the entries from routing table - return Collections.emptySet(); - } - @Override public void start() {/*NOOPS*/} @@ -97,14 +92,40 @@ public class ClientImpl implements RemoteRpcClient { * @param input payload for the remote service * @return */ - @Override public RpcResult invokeRpc(QName rpc, CompositeNode input) { + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(rpc); + + String address = lookupRemoteAddressForGlobalRpc(routeId); + return sendMessage(input, routeId, address); + } + + /** + * Finds remote server that can execute this routed rpc and sends a message to it + * requesting execution. + * The call blocks until a response from remote server is received. Its upto + * the client of this API to implement a timeout functionality. + * + * @param rpc + * rpc to be called + * @param identifier + * instance identifier on which rpc is to be executed + * @param input + * payload + * @return + */ + public RpcResult invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { RouteIdentifierImpl routeId = new RouteIdentifierImpl(); routeId.setType(rpc); + routeId.setRoute(identifier); + + String address = lookupRemoteAddressForRpc(routeId); - String address = lookupRemoteAddress(routeId); + return sendMessage(input, routeId, address); + } + private RpcResult sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) { Message request = new Message.MessageBuilder() .type(Message.MessageType.REQUEST) .sender(Context.getInstance().getLocalUri()) @@ -128,7 +149,6 @@ public class ClientImpl implements RemoteRpcClient { collectErrors(e, errors); return Rpcs.getRpcResult(false, null, errors); } - } /** @@ -136,19 +156,36 @@ public class ClientImpl implements RemoteRpcClient { * @param routeId route identifier * @return remote network address */ - private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId){ + private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier routeId){ checkNotNull(routeId, "route must not be null"); - Optional> routingTable = routingTableProvider.getRoutingTable(); + Optional> routingTable = routingTableProvider.getRoutingTable(); checkNotNull(routingTable.isPresent(), "Routing table is null"); - Set addresses = routingTable.get().getRoutes(routeId.toString()); - checkNotNull(addresses, "Address not found for route [%s]", routeId); - checkState(addresses.size() == 1, - "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); //its a global service. + String address = null; + try { + address = routingTable.get().getGlobalRoute(routeId); + } catch (RoutingTableException|SystemException e) { + _logger.error("Exception caught while looking up remote address " + e); + } + checkState(address != null, "Address not found for route [%s]", routeId); + + return address; + } + + /** + * Find address for the given route identifier in routing table + * @param routeId route identifier + * @return remote network address + */ + private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier routeId){ + checkNotNull(routeId, "route must not be null"); + + Optional> routingTable = routingTableProvider.getRoutingTable(); + checkNotNull(routingTable.isPresent(), "Routing table is null"); - String address = addresses.iterator().next(); - checkNotNull(address, "Address not found for route [%s]", routeId); + String address = routingTable.get().getLastAddedRoute(routeId); + checkState(address != null, "Address not found for route [%s]", routeId); return address; }