+ } catch (Exception e) {
+ _logger.debug("Unhandled exception [{}, {}]", e.getClass(), e.getMessage());
+ } finally {
+ if (clients != null) clients.close();
+ if (workers != null) workers.close();
+ _logger.info("Remote RPC Server stopped");
+ }
+ }
+ };
+ }
+
+ /**
+ * Register the remote RPCs from the routing table into broker
+ */
+ private void registerRemoteRpcs(){
+ Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
+
+ Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
+
+ Set<Map.Entry> remoteRoutes =
+ routingTableProvider.getRoutingTable().get().getAllRoutes();
+
+ //filter out all entries that contains local address
+ //we dont want to register local RPCs as remote
+ Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
+ public boolean apply(Map.Entry remoteRoute){
+ return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
+ }
+ };
+
+ //filter the entries created by current node
+ Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
+
+ for (Map.Entry route : filteredRemoteRoutes){
+ onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
+ }
+ }
+
+ /**
+ * Un-Register the local RPCs from the routing table
+ */
+ private void unregisterLocalRpcs(){
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationRemoved(rpc);
+ }
+ }
+
+ /**
+ * Publish all the locally registered RPCs in the routing table
+ */
+ private void announceLocalRpcs(){
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationAdded(rpc);
+ }
+ }
+
+ /**
+ * @param key
+ * @param value
+ */
+ @Override
+ public void onRouteUpdated(String key, String value) {
+ RouteIdentifierImpl rId = new RouteIdentifierImpl();
+ try {
+ _logger.debug("Updating key/value {}-{}", key, value);
+ brokerSession.addRpcImplementation(
+ (QName) rId.fromString(key).getType(), client);
+
+ //TODO: Check with Tony for routed rpc
+ //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
+ } catch (Exception e) {
+ _logger.info("Route update failed {}", e);