+ private final ServerImpl server;
+ private final ClientImpl client;
+ private RoutingTableProvider routingTableProvider;
+ private final RpcListener listener = new RpcListener();
+ private final RoutedRpcListener routeChangeListener = new RoutedRpcListener();
+ private ProviderSession brokerSession;
+ private RpcProvisionRegistry rpcProvisionRegistry;
+ private BundleContext context;
+ private ServiceTracker<?, ?> clusterTracker;
+
+ public RemoteRpcProvider(ServerImpl server, ClientImpl client) {
+ this.server = server;
+ this.client = client;
+ }
+
+ public void setRoutingTableProvider(RoutingTableProvider provider) {
+ this.routingTableProvider = provider;
+ client.setRoutingTableProvider(provider);
+ }
+
+ public void setContext(BundleContext context){
+ this.context = context;
+ }
+
+ public void setRpcProvisionRegistry(RpcProvisionRegistry rpcProvisionRegistry){
+ this.rpcProvisionRegistry = rpcProvisionRegistry;
+ }
+
+ @Override
+ public void onSessionInitiated(ProviderSession session) {
+ brokerSession = session;
+ server.setBrokerSession(session);
+ start();
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ //TODO: Ask Tony if we need to get this from routing table
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Collection<ProviderFunctionality> getProviderFunctionality() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, CompositeNode input) {
+ return client.invokeRpc(rpc, input);
+ }
+
+ @Override
+ public ListenableFuture<RpcResult<CompositeNode>> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+ return client.invokeRpc(rpc, identifier, input);
+ }
+
+ public void start() {
+ server.start();
+ client.start();
+ brokerSession.addRpcRegistrationListener(listener);
+ rpcProvisionRegistry.setRoutedRpcDefaultDelegate(this);
+ rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
+
+ announceSupportedRpcs();
+ announceSupportedRoutedRpcs();
+ }
+
+ @Override
+ public void close() throws Exception {
+ unregisterSupportedRpcs();
+ unregisterSupportedRoutedRpcs();
+ server.close();
+ client.close();
+ }
+
+ public void stop() {
+ server.stop();
+ client.stop();
+ }
+
+ /**
+ * Add all the locally registered RPCs in the clustered routing table
+ */
+ private void announceSupportedRpcs(){
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationAdded(rpc);