- /**
- * Register the remote RPCs from the routing table into broker
- */
- private void registerRemoteRpcs(){
- Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
-
- Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
-
- Set<Map.Entry> remoteRoutes =
- routingTableProvider.getRoutingTable().get().getAllRoutes();
-
- //filter out all entries that contains local address
- //we dont want to register local RPCs as remote
- Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
- public boolean apply(Map.Entry remoteRoute){
- return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
- }
- };
-
- //filter the entries created by current node
- Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
-
- for (Map.Entry route : filteredRemoteRoutes){
- onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
- }
- }
-
- /**
- * Un-Register the local RPCs from the routing table
- */
- private void unregisterLocalRpcs(){
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- listener.onRpcImplementationRemoved(rpc);
- }
- }
-
- /**
- * Publish all the locally registered RPCs in the routing table
- */
- private void announceLocalRpcs(){
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- listener.onRpcImplementationAdded(rpc);
- }
- }
-
- /**
- * @param key
- * @param value
- */
- @Override
- public void onRouteUpdated(String key, String value) {
- RouteIdentifierImpl rId = new RouteIdentifierImpl();
- try {
- _logger.debug("Updating key/value {}-{}", key, value);
- brokerSession.addRpcImplementation(
- (QName) rId.fromString(key).getType(), client);
-
- //TODO: Check with Tony for routed rpc
- //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
- } catch (Exception e) {
- _logger.info("Route update failed {}", e);
- }
- }
-
- /**
- * @param key
- */
- @Override
- public void onRouteDeleted(String key) {
- //TODO: Broker session needs to be updated to support this
- throw new UnsupportedOperationException();
- }
-