package org.opendaylight.controller.sal.connector.remoterpc;
-import com.google.common.base.Optional;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
import org.opendaylight.controller.sal.common.util.RpcErrors;
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
/**
- * An implementation of {@link RpcImplementation} that makes
+ * An implementation of {@link org.opendaylight.controller.sal.core.api.RpcImplementation} that makes
* remote RPC calls
*/
public class ClientImpl implements RemoteRpcClient {
private final Logger _logger = LoggerFactory.getLogger(ClientImpl.class);
- private ZMQ.Context context = ZMQ.context(1);
- private ClientRequestHandler handler;
+ private final ZMQ.Context context = ZMQ.context(1);
+ private final ClientRequestHandler handler;
private RoutingTableProvider routingTableProvider;
public ClientImpl(){
return routingTableProvider;
}
+ @Override
public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
this.routingTableProvider = routingTableProvider;
}
- @Override
- public Set<QName> getSupportedRpcs(){
- //TODO: Find the entries from routing table
- return Collections.emptySet();
- }
-
@Override
public void start() {/*NOOPS*/}
* @param input payload for the remote service
* @return
*/
- @Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(rpc);
+
+ String address = lookupRemoteAddressForGlobalRpc(routeId);
+ return sendMessage(input, routeId, address);
+ }
+
+ /**
+ * Finds remote server that can execute this routed rpc and sends a message to it
+ * requesting execution.
+ * The call blocks until a response from remote server is received. Its upto
+ * the client of this API to implement a timeout functionality.
+ *
+ * @param rpc
+ * rpc to be called
+ * @param identifier
+ * instance identifier on which rpc is to be executed
+ * @param input
+ * payload
+ * @return
+ */
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
RouteIdentifierImpl routeId = new RouteIdentifierImpl();
routeId.setType(rpc);
+ routeId.setRoute(identifier);
- String address = lookupRemoteAddress(routeId);
+ String address = lookupRemoteAddressForRpc(routeId);
+ return sendMessage(input, routeId, address);
+ }
+
+ private ListenableFuture<RpcResult<CompositeNode>> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
Message request = new Message.MessageBuilder()
.type(Message.MessageType.REQUEST)
.sender(Context.getInstance().getLocalUri())
Message response = handler.handle(request);
CompositeNode payload = null;
- if ( response != null )
- payload = XmlUtils.xmlToCompositeNode((String) response.getPayload());
+ if ( response != null ) {
+
+ _logger.info("Received response [{}]", response);
+
+ Object rawPayload = response.getPayload();
+ switch (response.getType()) {
+ case ERROR:
+ if ( rawPayload instanceof List )
+ errors = (List) rawPayload;
+ break;
+
+ case RESPONSE:
+ payload = XmlUtils.xmlToCompositeNode((String) rawPayload);
+ break;
- return Rpcs.getRpcResult(true, payload, errors);
+ default:
+ errors.add(
+ RpcErrors.getRpcError(null, null,null,null,"Unable to get response from remote controller", null, null)
+ );
+ break;
+
+ }
+ }
+ return Futures.immediateFuture(Rpcs.getRpcResult(true, payload, errors));
} catch (Exception e){
collectErrors(e, errors);
- return Rpcs.getRpcResult(false, null, errors);
+ return Futures.immediateFuture(Rpcs.<CompositeNode>getRpcResult(false, null, errors));
}
-
}
/**
* @param routeId route identifier
* @return remote network address
*/
- private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId){
+ private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
checkNotNull(routeId, "route must not be null");
- Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
+ Optional<RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String>> routingTable = routingTableProvider.getRoutingTable();
checkNotNull(routingTable.isPresent(), "Routing table is null");
- Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
- checkNotNull(addresses, "Address not found for route [%s]", routeId);
- checkState(addresses.size() == 1,
- "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); //its a global service.
+ String address = null;
+ try {
+ address = routingTable.get().getGlobalRoute(routeId);
+ } catch (RoutingTableException|SystemException e) {
+ _logger.error("Exception caught while looking up remote address " + e);
+ }
+ checkState(address != null, "Address not found for route [%s]", routeId);
+
+ return address;
+ }
+
+ /**
+ * Find address for the given route identifier in routing table
+ * @param routeId route identifier
+ * @return remote network address
+ */
+ private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier<?, ?, ?> routeId){
+ checkNotNull(routeId, "route must not be null");
+
+ Optional<RoutingTable<RpcRouter.RouteIdentifier<?, ?, ?>, String>> routingTable = routingTableProvider.getRoutingTable();
+ checkNotNull(routingTable.isPresent(), "Routing table is null");
- String address = addresses.iterator().next();
- checkNotNull(address, "Address not found for route [%s]", routeId);
+ String address = routingTable.get().getLastAddedRoute(routeId);
+ checkState(address != null, "Address not found for route [%s]", routeId);
return address;
}
*/
private void closeZmqContext() {
ExecutorService exec = Executors.newSingleThreadExecutor();
- FutureTask zmqTermination = new FutureTask(new Runnable() {
+ FutureTask<?> zmqTermination = new FutureTask<Void>(new Runnable() {
@Override
public void run() {