package org.opendaylight.controller.sal.connector.remoterpc;
import com.google.common.base.Optional;
-
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Sets;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.Message.MessageType;
import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
-import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
-import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
/**
- * ZeroMq based implementation of RpcRouter TODO: 1. Make rpc request handling
- * async and non-blocking. Note zmq socket is not thread safe 2. Read properties
- * from config file using existing(?) ODL properties framework
+ * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
+ * so that it gets route change notifications from routing table.
*/
-public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, Set> {
+public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
- private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
+ private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
- private ExecutorService serverPool;
+ private ExecutorService serverPool;
+ protected ServerRequestHandler handler;
- // private RoutingTable<RpcRouter.RouteIdentifier, String> routingTable;
- private RoutingTableProvider routingTable;
- private Set<QName> remoteServices;
- private ProviderSession brokerSession;
- private ZMQ.Context context;
- private ZMQ.Socket replySocket;
+ private Set<QName> remoteServices;
+ private ProviderSession brokerSession;
+ private ZMQ.Context context;
- private final RpcListener listener = new RpcListener();
+ private final RpcListener listener = new RpcListener();
- private final String localUri = Context.getInstance().getLocalUri();
+ private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
+ private final int HANDLER_WORKER_COUNT = 2;
+ private final int HWM = 200;//high water mark on sockets
+ private volatile State status = State.STOPPED;
- private final int rpcPort;
+ private String serverAddress;
+ private int port;
- private RpcImplementation client;
+ private ClientImpl client;
- public RpcImplementation getClient() {
- return client;
- }
+ private RoutingTableProvider routingTableProvider;
- public void setClient(RpcImplementation client) {
- this.client = client;
- }
+ public static enum State {
+ STARTING, STARTED, STOPPED;
+ }
- // Prevent instantiation
- public ServerImpl(int rpcPort) {
- this.rpcPort = rpcPort;
- }
+ public ServerImpl(int port) {
+ this.port = port;
+ }
- public void setBrokerSession(ProviderSession session) {
- this.brokerSession = session;
- }
+ public RoutingTableProvider getRoutingTableProvider() {
+ return routingTableProvider;
+ }
- public ExecutorService getServerPool() {
- return serverPool;
- }
+ public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
+ this.routingTableProvider = routingTableProvider;
+ }
- public void setServerPool(ExecutorService serverPool) {
- this.serverPool = serverPool;
- }
+ public ClientImpl getClient(){
+ return this.client;
+ }
- public void start() {
- context = ZMQ.context(1);
- serverPool = Executors.newSingleThreadExecutor();
- remoteServices = new HashSet<QName>();
+ public void setClient(ClientImpl client) {
+ this.client = client;
+ }
- // Start listening rpc requests
- serverPool.execute(receive());
+ public State getStatus() {
+ return this.status;
+ }
- brokerSession.addRpcRegistrationListener(listener);
- // routingTable.registerRouteChangeListener(routeChangeListener);
+ public Optional<ServerRequestHandler> getHandler() {
+ return Optional.fromNullable(this.handler);
+ }
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- listener.onRpcImplementationAdded(rpc);
- }
+ public void setBrokerSession(ProviderSession session) {
+ this.brokerSession = session;
+ }
- _logger.debug("RPC Server started [{}]", localUri);
- }
+ public Optional<ProviderSession> getBrokerSession() {
+ return Optional.fromNullable(this.brokerSession);
+ }
- public void stop() {
- // TODO: un-subscribe
+ public Optional<ZMQ.Context> getZmqContext() {
+ return Optional.fromNullable(this.context);
+ }
- // if (context != null)
- // context.term();
- //
- // _logger.debug("ZMQ Context is terminated.");
+ public String getServerAddress() {
+ return serverAddress;
+ }
- if (serverPool != null)
- serverPool.shutdown();
+ public String getHandlerAddress() {
+ return HANDLER_INPROC_ADDRESS;
+ }
- _logger.debug("Thread pool is closed.");
- }
+ /**
+ *
+ */
+ public void start() {
+ Preconditions.checkState(State.STOPPED == this.getStatus(),
+ "Remote RPC Server is already running");
+
+ status = State.STARTING;
+ _logger.debug("Remote RPC Server is starting...");
- private Runnable receive() {
- return new Runnable() {
- public void run() {
-
- // Bind to RPC reply socket
- replySocket = context.socket(ZMQ.REP);
- replySocket.bind("tcp://*:" + Context.getInstance().getRpcPort());
-
- // Poller enables listening on multiple sockets using a single
- // thread
- ZMQ.Poller poller = new ZMQ.Poller(1);
- poller.register(replySocket, ZMQ.Poller.POLLIN);
- try {
- // TODO: Add code to restart the thread after exception
- while (!Thread.currentThread().isInterrupted()) {
-
- poller.poll();
-
- if (poller.pollin(0)) {
- handleRpcCall();
- }
- }
- } catch (Exception e) {
- // log and continue
- _logger.error("Unhandled exception [{}]", e);
- } finally {
- poller.unregister(replySocket);
- replySocket.close();
- }
-
- }
- };
+ String hostIpAddress = findIpAddress();
+
+ //Log and silently die as per discussion in the bug (bug-362)
+ //https://bugs.opendaylight.org/show_bug.cgi?id=362
+ //
+ // A tracking enhancement defect (bug-366) is created to properly fix this issue
+ //https://bugs.opendaylight.org/show_bug.cgi?id=366
+ //checkState(hostIpAddress != null, "Remote RPC Server could not acquire host ip address");
+
+ if (hostIpAddress == null) {
+ _logger.error("Remote RPC Server could not acquire host ip address. Stopping...");
+ stop();
+ return;
}
- /**
- * @throws InterruptedException
- * @throws ExecutionException
- */
- private void handleRpcCall() {
+ this.serverAddress = new StringBuilder(hostIpAddress).
+ append(":").
+ append(port).
+ toString();
- Message request = parseMessage(replySocket);
+ context = ZMQ.context(1);
+ remoteServices = new HashSet<QName>();//
+ serverPool = Executors.newSingleThreadExecutor();//main server thread
+ serverPool.execute(receive()); // Start listening rpc requests
+ brokerSession.addRpcRegistrationListener(listener);
- _logger.debug("Received rpc request [{}]", request);
+ announceLocalRpcs();
- // Call broker to process the message then reply
- Future<RpcResult<CompositeNode>> rpc = null;
- RpcResult<CompositeNode> result = null;
- try {
- rpc = brokerSession.rpc((QName) request.getRoute().getType(),
- XmlUtils.xmlToCompositeNode((String) request.getPayload()));
+ registerRemoteRpcs();
- result = (rpc != null) ? rpc.get() : null;
+ status = State.STARTED;
+ _logger.info("Remote RPC Server started [{}]", getServerAddress());
+ }
- } catch (Exception e) {
- _logger.debug("Broker threw [{}]", e);
- }
+ public void stop(){
+ close();
+ }
+
+ /**
+ *
+ */
+ @Override
+ public void close() {
- CompositeNode payload = (result != null) ? result.getResult() : null;
+ if (State.STOPPED == this.getStatus()) return; //do nothing
- Message response = new Message.MessageBuilder().type(MessageType.RESPONSE).sender(localUri)
- .route(request.getRoute()).payload(XmlUtils.compositeNodeToXml(payload)).build();
+ unregisterLocalRpcs();
- _logger.debug("Sending rpc response [{}]", response);
+ if (serverPool != null)
+ serverPool.shutdown();
+ closeZmqContext();
+
+ status = State.STOPPED;
+ _logger.info("Remote RPC Server stopped");
+ }
+
+ /**
+ * Closes ZMQ Context. It tries to gracefully terminate the context. If
+ * termination takes more than a second, its forcefully shutdown.
+ */
+ private void closeZmqContext() {
+ ExecutorService exec = Executors.newSingleThreadExecutor();
+ FutureTask zmqTermination = new FutureTask(new Runnable() {
+
+ @Override
+ public void run() {
try {
- replySocket.send(Message.serialize(response));
+ if (context != null)
+ context.term();
+ _logger.debug("ZMQ Context terminated gracefully!");
} catch (Exception e) {
- _logger.debug("rpc response send failed for message [{}]", response);
- _logger.debug("{}", e);
+ _logger.debug("ZMQ Context termination threw exception [{}]. Continuing shutdown...", e);
}
+ }
+ }, null);
+
+ exec.execute(zmqTermination);
+
+ try {
+ zmqTermination.get(5L, TimeUnit.SECONDS);
+ } catch (Exception e) {/*ignore and continue with shutdown*/}
+
+ exec.shutdownNow();
+ }
+
+ /**
+ * Main listener thread that spawns {@link ServerRequestHandler} as workers.
+ *
+ * @return
+ */
+ private Runnable receive() {
+ return new Runnable() {
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("remote-rpc-server");
+ _logger.debug("Remote RPC Server main thread starting...");
+
+ //socket clients connect to (frontend)
+ ZMQ.Socket clients = context.socket(ZMQ.ROUTER);
+
+ //socket RequestHandlers connect to (backend)
+ ZMQ.Socket workers = context.socket(ZMQ.DEALER);
+
+ try (SocketPair capturePair = new SocketPair();
+ ServerRequestHandler requestHandler = new ServerRequestHandler(context,
+ brokerSession,
+ HANDLER_WORKER_COUNT,
+ HANDLER_INPROC_ADDRESS,
+ getServerAddress());) {
+
+ handler = requestHandler;
+ clients.setHWM(HWM);
+ clients.bind("tcp://*:" + port);
+ workers.setHWM(HWM);
+ workers.bind(HANDLER_INPROC_ADDRESS);
+ //start worker threads
+ _logger.debug("Remote RPC Server worker threads starting...");
+ requestHandler.start();
+ //start capture thread
+ // handlerPool.execute(new CaptureHandler(capturePair.getReceiver()));
+ // Connect work threads to client threads via a queue
+ ZMQ.proxy(clients, workers, null);//capturePair.getSender());
+ } catch (Exception e) {
+ _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
+ } finally {
+ if (clients != null) clients.close();
+ if (workers != null) workers.close();
+ _logger.info("Remote RPC Server stopped");
+ }
+ }
+ };
+ }
+
+ /**
+ * Register the remote RPCs from the routing table into broker
+ */
+ private void registerRemoteRpcs(){
+ Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
+
+ Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
+
+ Set<Map.Entry> remoteRoutes =
+ routingTableProvider.getRoutingTable().get().getAllRoutes();
+
+ //filter out all entries that contains local address
+ //we dont want to register local RPCs as remote
+ Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
+ public boolean apply(Map.Entry remoteRoute){
+ return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
+ }
+ };
+
+ //filter the entries created by current node
+ Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
+
+ for (Map.Entry route : filteredRemoteRoutes){
+ onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
}
-
- /**
- * @param socket
- * @return
- */
- private Message parseMessage(ZMQ.Socket socket) {
-
- Message msg = null;
- try {
- byte[] bytes = socket.recv();
- _logger.debug("Received bytes:[{}]", bytes.length);
- msg = (Message) Message.deserialize(bytes);
- } catch (Throwable t) {
- t.printStackTrace();
+ }
+
+ /**
+ * Un-Register the local RPCs from the routing table
+ */
+ private void unregisterLocalRpcs(){
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationRemoved(rpc);
+ }
+ }
+
+ /**
+ * Publish all the locally registered RPCs in the routing table
+ */
+ private void announceLocalRpcs(){
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationAdded(rpc);
+ }
+ }
+
+ /**
+ * @param key
+ * @param value
+ */
+ @Override
+ public void onRouteUpdated(String key, String value) {
+ RouteIdentifierImpl rId = new RouteIdentifierImpl();
+ try {
+ _logger.debug("Updating key/value {}-{}", key, value);
+ brokerSession.addRpcImplementation(
+ (QName) rId.fromString(key).getType(), client);
+
+ //TODO: Check with Tony for routed rpc
+ //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
+ } catch (Exception e) {
+ _logger.info("Route update failed {}", e);
+ }
+ }
+
+ /**
+ * @param key
+ */
+ @Override
+ public void onRouteDeleted(String key) {
+ //TODO: Broker session needs to be updated to support this
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Finds IPv4 address of the local VM
+ * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
+ * address will be returned. Read IP from a property file or enhance the code to make it deterministic.
+ * Should we use IP or hostname?
+ *
+ * @return
+ */
+ private String findIpAddress() {
+ Enumeration e = null;
+ try {
+ e = NetworkInterface.getNetworkInterfaces();
+ } catch (SocketException e1) {
+ _logger.error("Failed to get list of interfaces", e1);
+ //throw new RuntimeException("Failed to acquire list of interfaces", e1);
+ return null;
+ }
+ while (e.hasMoreElements()) {
+
+ NetworkInterface n = (NetworkInterface) e.nextElement();
+
+ Enumeration ee = n.getInetAddresses();
+ while (ee.hasMoreElements()) {
+ InetAddress i = (InetAddress) ee.nextElement();
+ _logger.debug("Trying address {}", i);
+ if ((i instanceof Inet4Address) && (i.isSiteLocalAddress())) {
+ String hostAddress = i.getHostAddress();
+ _logger.debug("Settled on host address {}", hostAddress);
+ return hostAddress;
}
- return msg;
+ }
}
+ _logger.error("Failed to find a suitable host address");
+ return null;
+ }
+
+ /**
+ * Listener for rpc registrations
+ */
+ private class RpcListener implements RpcRegistrationListener {
+
@Override
- public void onRouteUpdated(String key, Set values) {
- RouteIdentifierImpl rId = new RouteIdentifierImpl();
- try {
- _logger.debug("Updating key/value {}-{}", key, values);
- brokerSession.addRpcImplementation((QName) rId.fromString(key).getType(), client);
+ public void onRpcImplementationAdded(QName name) {
- } catch (Exception e) {
- _logger.info("Route update failed {}", e);
- }
+ //if the service name exists in the set, this notice
+ //has bounced back from the broker. It should be ignored
+ if (remoteServices.contains(name))
+ return;
+
+ _logger.debug("Adding registration for [{}]", name);
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(name);
+
+ RoutingTable<String, String> routingTable = getRoutingTable();
+
+ try {
+ routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
+ _logger.debug("Route added [{}-{}]", name, getServerAddress());
+
+ } catch (RoutingTableException | SystemException e) {
+ //TODO: This can be thrown when route already exists in the table. Broker
+ //needs to handle this.
+ _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
+
+ }
}
@Override
- public void onRouteDeleted(String key) {
- // TODO: Broker session needs to be updated to support this
- throw new UnsupportedOperationException();
+ public void onRpcImplementationRemoved(QName name) {
+
+ _logger.debug("Removing registration for [{}]", name);
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(name);
+
+ RoutingTable<String, String> routingTable = getRoutingTable();
+
+ try {
+ routingTable.removeGlobalRoute(routeId.toString());
+ } catch (RoutingTableException | SystemException e) {
+ _logger.error("Route delete failed {}", e);
+ }
}
-
- /**
- * Listener for rpc registrations
- */
- private class RpcListener implements RpcRegistrationListener {
-
-
-
- @Override
- public void onRpcImplementationAdded(QName name) {
-
- // if the service name exists in the set, this notice
- // has bounced back from the broker. It should be ignored
- if (remoteServices.contains(name))
- return;
-
- _logger.debug("Adding registration for [{}]", name);
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setType(name);
-
- try {
- routingTable.getRoutingTable().get().addGlobalRoute(routeId.toString(), localUri);
- _logger.debug("Route added [{}-{}]", name, localUri);
- } catch (RoutingTableException | SystemException e) {
- // TODO: This can be thrown when route already exists in the
- // table. Broker
- // needs to handle this.
- _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
-
- }
- }
- @Override
- public void onRpcImplementationRemoved(QName name) {
+ private RoutingTable<String, String> getRoutingTable(){
+ Optional<RoutingTable<String, String>> routingTable =
+ routingTableProvider.getRoutingTable();
- _logger.debug("Removing registration for [{}]", name);
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setType(name);
+ checkNotNull(routingTable.isPresent(), "Routing table is null");
- try {
- routingTable.getRoutingTable().get().removeGlobalRoute(routeId.toString());
- } catch (RoutingTableException | SystemException e) {
- _logger.error("Route delete failed {}", e);
- }
- }
+ return routingTable.get();
}
+ }
+
+ /*
+ * Listener for Route changes in broker. Broker notifies this listener in the event
+ * of any change (add/delete). Listener then updates the routing table.
+ */
+ private class BrokerRouteChangeListener
+ implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
@Override
- public void close() throws Exception {
- stop();
- }
+ public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
- public void setRoutingTableProvider(RoutingTableProvider provider) {
- this.routingTable = provider;
}
+ }
}