*/
package org.opendaylight.controller.sal.connector.remoterpc;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Sets;
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
-import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
-import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
-import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.zeromq.ZMQ;
-
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
-import static com.google.common.base.Preconditions.checkNotNull;
+import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.zeromq.ZMQ;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
/**
- * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
- * so that it gets route change notifications from routing table.
+ * ZeroMq based implementation of RpcRouter.
*/
-public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
+public class ServerImpl implements RemoteRpcServer {
- private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
+ private final Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
private ExecutorService serverPool;
protected ServerRequestHandler handler;
private ProviderSession brokerSession;
private ZMQ.Context context;
- private final RpcListener listener = new RpcListener();
-
private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
private final int HANDLER_WORKER_COUNT = 2;
private final int HWM = 200;//high water mark on sockets
private volatile State status = State.STOPPED;
private String serverAddress;
- private int port;
-
- private ClientImpl client;
-
- private RoutingTableProvider routingTableProvider;
+ private final int port;
public static enum State {
STARTING, STARTED, STOPPED;
public ServerImpl(int port) {
this.port = port;
- this.serverAddress = new StringBuilder(findIpAddress()).
- append(":").
- append(port).
- toString();
- }
-
- public RoutingTableProvider getRoutingTableProvider() {
- return routingTableProvider;
- }
-
- public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
- this.routingTableProvider = routingTableProvider;
- }
-
- public ClientImpl getClient(){
- return this.client;
- }
-
- public void setClient(ClientImpl client) {
- this.client = client;
}
public State getStatus() {
"Remote RPC Server is already running");
status = State.STARTING;
+ _logger.debug("Remote RPC Server is starting...");
+
+ String hostIpAddress = findIpAddress();
+
+ //Log and silently die as per discussion in the bug (bug-362)
+ //https://bugs.opendaylight.org/show_bug.cgi?id=362
+ //
+ // A tracking enhancement defect (bug-366) is created to properly fix this issue
+ //https://bugs.opendaylight.org/show_bug.cgi?id=366
+ //checkState(hostIpAddress != null, "Remote RPC Server could not acquire host ip address");
+
+ if (hostIpAddress == null) {
+ _logger.error("Remote RPC Server could not acquire host ip address. Stopping...");
+ stop();
+ return;
+ }
+
+ this.serverAddress = new StringBuilder(hostIpAddress).
+ append(":").
+ append(port).
+ toString();
+
context = ZMQ.context(1);
remoteServices = new HashSet<QName>();//
serverPool = Executors.newSingleThreadExecutor();//main server thread
serverPool.execute(receive()); // Start listening rpc requests
- brokerSession.addRpcRegistrationListener(listener);
-
- announceLocalRpcs();
-
- registerRemoteRpcs();
status = State.STARTED;
_logger.info("Remote RPC Server started [{}]", getServerAddress());
if (State.STOPPED == this.getStatus()) return; //do nothing
- unregisterLocalRpcs();
-
if (serverPool != null)
serverPool.shutdown();
/**
* Closes ZMQ Context. It tries to gracefully terminate the context. If
- * termination takes more than a second, its forcefully shutdown.
+ * termination takes more than 5 seconds, its forcefully shutdown.
*/
private void closeZmqContext() {
ExecutorService exec = Executors.newSingleThreadExecutor();
- FutureTask zmqTermination = new FutureTask(new Runnable() {
+ FutureTask<?> zmqTermination = new FutureTask<Void>(new Runnable() {
@Override
public void run() {
};
}
- /**
- * Register the remote RPCs from the routing table into broker
- */
- private void registerRemoteRpcs(){
- Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
-
- Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
-
- Set<Map.Entry> remoteRoutes =
- routingTableProvider.getRoutingTable().get().getAllRoutes();
-
- //filter out all entries that contains local address
- //we dont want to register local RPCs as remote
- Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
- public boolean apply(Map.Entry remoteRoute){
- return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
- }
- };
-
- //filter the entries created by current node
- Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
-
- for (Map.Entry route : filteredRemoteRoutes){
- onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
- }
- }
-
- /**
- * Un-Register the local RPCs from the routing table
- */
- private void unregisterLocalRpcs(){
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- listener.onRpcImplementationRemoved(rpc);
- }
- }
-
- /**
- * Publish all the locally registered RPCs in the routing table
- */
- private void announceLocalRpcs(){
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- listener.onRpcImplementationAdded(rpc);
- }
- }
-
- /**
- * @param key
- * @param value
- */
- @Override
- public void onRouteUpdated(String key, String value) {
- RouteIdentifierImpl rId = new RouteIdentifierImpl();
- try {
- _logger.debug("Updating key/value {}-{}", key, value);
- brokerSession.addRpcImplementation(
- (QName) rId.fromString(key).getType(), client);
-
- //TODO: Check with Tony for routed rpc
- //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
- } catch (Exception e) {
- _logger.info("Route update failed {}", e);
- }
- }
-
- /**
- * @param key
- */
- @Override
- public void onRouteDeleted(String key) {
- //TODO: Broker session needs to be updated to support this
- throw new UnsupportedOperationException();
- }
-
/**
* Finds IPv4 address of the local VM
* TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
* @return
*/
private String findIpAddress() {
- String hostAddress = null;
- Enumeration e = null;
+ Enumeration<?> e = null;
try {
e = NetworkInterface.getNetworkInterfaces();
} catch (SocketException e1) {
- e1.printStackTrace();
+ _logger.error("Failed to get list of interfaces", e1);
+ return null;
}
while (e.hasMoreElements()) {
NetworkInterface n = (NetworkInterface) e.nextElement();
- Enumeration ee = n.getInetAddresses();
+ Enumeration<?> ee = n.getInetAddresses();
while (ee.hasMoreElements()) {
InetAddress i = (InetAddress) ee.nextElement();
- if ((i instanceof Inet4Address) && (i.isSiteLocalAddress()))
- hostAddress = i.getHostAddress();
- }
- }
- return hostAddress;
-
- }
-
- /**
- * Listener for rpc registrations
- */
- private class RpcListener implements RpcRegistrationListener {
-
- @Override
- public void onRpcImplementationAdded(QName name) {
-
- //if the service name exists in the set, this notice
- //has bounced back from the broker. It should be ignored
- if (remoteServices.contains(name))
- return;
-
- _logger.debug("Adding registration for [{}]", name);
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setType(name);
-
- RoutingTable<String, String> routingTable = getRoutingTable();
-
- try {
- routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
- _logger.debug("Route added [{}-{}]", name, getServerAddress());
-
- } catch (RoutingTableException | SystemException e) {
- //TODO: This can be thrown when route already exists in the table. Broker
- //needs to handle this.
- _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
-
- }
- }
-
- @Override
- public void onRpcImplementationRemoved(QName name) {
-
- _logger.debug("Removing registration for [{}]", name);
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setType(name);
-
- RoutingTable<String, String> routingTable = getRoutingTable();
-
- try {
- routingTable.removeGlobalRoute(routeId.toString());
- } catch (RoutingTableException | SystemException e) {
- _logger.error("Route delete failed {}", e);
+ _logger.debug("Trying address {}", i);
+ if ((i instanceof Inet4Address) && (!i.isLoopbackAddress())) {
+ String hostAddress = i.getHostAddress();
+ _logger.debug("Settled on host address {}", hostAddress);
+ return hostAddress;
+ }
}
}
- private RoutingTable<String, String> getRoutingTable(){
- Optional<RoutingTable<String, String>> routingTable =
- routingTableProvider.getRoutingTable();
-
- checkNotNull(routingTable.isPresent(), "Routing table is null");
-
- return routingTable.get();
- }
- }
-
- /*
- * Listener for Route changes in broker. Broker notifies this listener in the event
- * of any change (add/delete). Listener then updates the routing table.
- */
- private class BrokerRouteChangeListener
- implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
-
- @Override
- public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
-
- }
+ _logger.error("Failed to find a suitable host address");
+ return null;
}
}