X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-remoterpc-connector%2Fimplementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fconnector%2Fremoterpc%2FClientImpl.java;h=200ebaee6a5a0fc2b34b7d97c52ce2350b8ee3a1;hp=291fe0b8e73a5a52e65f1d6e9be4fcca31290a50;hb=445deeb731eee757ec1b37d5a24301e99d97394b;hpb=80794420bbe8e585964ad17fb2d82dbab75f94b0 diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java index 291fe0b8e7..200ebaee6a 100644 --- a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java +++ b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java @@ -7,45 +7,48 @@ package org.opendaylight.controller.sal.connector.remoterpc; -import com.google.common.base.Optional; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; + import org.opendaylight.controller.sal.common.util.RpcErrors; import org.opendaylight.controller.sal.common.util.Rpcs; import org.opendaylight.controller.sal.connector.api.RpcRouter; import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable; +import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException; +import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException; import org.opendaylight.controller.sal.connector.remoterpc.dto.Message; import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl; import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils; -import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcResult; import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeromq.ZMQ; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; /** - * An implementation of {@link RpcImplementation} that makes + * An implementation of {@link org.opendaylight.controller.sal.core.api.RpcImplementation} that makes * remote RPC calls */ public class ClientImpl implements RemoteRpcClient { private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class); - private ZMQ.Context context = ZMQ.context(1); - private ClientRequestHandler handler; + private final ZMQ.Context context = ZMQ.context(1); + private final ClientRequestHandler handler; private RoutingTableProvider routingTableProvider; public ClientImpl(){ @@ -62,16 +65,11 @@ public class ClientImpl implements RemoteRpcClient { return routingTableProvider; } + @Override public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) { this.routingTableProvider = routingTableProvider; } - @Override - public Set getSupportedRpcs(){ - //TODO: Find the entries from routing table - return Collections.emptySet(); - } - @Override public void start() {/*NOOPS*/} @@ -97,14 +95,40 @@ public class ClientImpl implements RemoteRpcClient { * @param input payload for the remote service * @return */ - @Override - public RpcResult invokeRpc(QName rpc, CompositeNode input) { + public ListenableFuture> invokeRpc(QName rpc, CompositeNode input) { + RouteIdentifierImpl routeId = new RouteIdentifierImpl(); + routeId.setType(rpc); + + String address = lookupRemoteAddressForGlobalRpc(routeId); + return sendMessage(input, routeId, address); + } + + /** + * Finds remote server that can execute this routed rpc and sends a message to it + * requesting execution. + * The call blocks until a response from remote server is received. Its upto + * the client of this API to implement a timeout functionality. + * + * @param rpc + * rpc to be called + * @param identifier + * instance identifier on which rpc is to be executed + * @param input + * payload + * @return + */ + public ListenableFuture> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) { RouteIdentifierImpl routeId = new RouteIdentifierImpl(); routeId.setType(rpc); + routeId.setRoute(identifier); - String address = lookupRemoteAddress(routeId); + String address = lookupRemoteAddressForRpc(routeId); + return sendMessage(input, routeId, address); + } + + private ListenableFuture> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) { Message request = new Message.MessageBuilder() .type(Message.MessageType.REQUEST) .sender(Context.getInstance().getLocalUri()) @@ -119,16 +143,35 @@ public class ClientImpl implements RemoteRpcClient { Message response = handler.handle(request); CompositeNode payload = null; - if ( response != null ) - payload = XmlUtils.xmlToCompositeNode((String) response.getPayload()); + if ( response != null ) { + + _logger.info("Received response [{}]", response); + + Object rawPayload = response.getPayload(); + switch (response.getType()) { + case ERROR: + if ( rawPayload instanceof List ) + errors = (List) rawPayload; + break; + + case RESPONSE: + payload = XmlUtils.xmlToCompositeNode((String) rawPayload); + break; - return Rpcs.getRpcResult(true, payload, errors); + default: + errors.add( + RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null) + ); + break; + + } + } + return Futures.immediateFuture(Rpcs.getRpcResult(true, payload, errors)); } catch (Exception e){ collectErrors(e, errors); - return Rpcs.getRpcResult(false, null, errors); + return Futures.immediateFuture(Rpcs.getRpcResult(false, null, errors)); } - } /** @@ -136,19 +179,36 @@ public class ClientImpl implements RemoteRpcClient { * @param routeId route identifier * @return remote network address */ - private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId){ + private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier routeId){ checkNotNull(routeId, "route must not be null"); - Optional> routingTable = routingTableProvider.getRoutingTable(); + Optional, String>> routingTable = routingTableProvider.getRoutingTable(); checkNotNull(routingTable.isPresent(), "Routing table is null"); - Set addresses = routingTable.get().getRoutes(routeId.toString()); - checkNotNull(addresses, "Address not found for route [%s]", routeId); - checkState(addresses.size() == 1, - "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); //its a global service. + String address = null; + try { + address = routingTable.get().getGlobalRoute(routeId); + } catch (RoutingTableException|SystemException e) { + _logger.error("Exception caught while looking up remote address " + e); + } + checkState(address != null, "Address not found for route [%s]", routeId); + + return address; + } + + /** + * Find address for the given route identifier in routing table + * @param routeId route identifier + * @return remote network address + */ + private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier routeId){ + checkNotNull(routeId, "route must not be null"); + + Optional, String>> routingTable = routingTableProvider.getRoutingTable(); + checkNotNull(routingTable.isPresent(), "Routing table is null"); - String address = addresses.iterator().next(); - checkNotNull(address, "Address not found for route [%s]", routeId); + String address = routingTable.get().getLastAddedRoute(routeId); + checkState(address != null, "Address not found for route [%s]", routeId); return address; } @@ -169,7 +229,7 @@ public class ClientImpl implements RemoteRpcClient { */ private void closeZmqContext() { ExecutorService exec = Executors.newSingleThreadExecutor(); - FutureTask zmqTermination = new FutureTask(new Runnable() { + FutureTask zmqTermination = new FutureTask(new Runnable() { @Override public void run() {