-
- private class ConsumerSessionImpl implements ConsumerSession {
-
- private final Consumer consumer;
-
- private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections
- .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());
- private boolean closed = false;
-
- private BundleContext context;
-
- public Consumer getConsumer() {
- return consumer;
- }
-
- public ConsumerSessionImpl(Consumer consumer, BundleContext ctx) {
- this.consumer = consumer;
- this.context = ctx;
- }
-
- @Override
- public Future<RpcResult<CompositeNode>> rpc(QName rpc,
- CompositeNode input) {
- return BrokerImpl.this.invokeRpc(rpc, input);
- }
-
- @Override
- public <T extends BrokerService> T getService(Class<T> service) {
- BrokerService potential = instantiatedServices.get(service);
- if (potential != null) {
- @SuppressWarnings("unchecked")
- T ret = (T) potential;
- return ret;
- }
- T ret = BrokerImpl.this.serviceFor(service, this);
- if (ret != null) {
- instantiatedServices.put(service, ret);
- }
- return ret;
- }
-
- @Override
- public void close() {
- Collection<BrokerService> toStop = instantiatedServices.values();
- this.closed = true;
- for (BrokerService brokerService : toStop) {
- brokerService.closeSession();
- }
- BrokerImpl.this.consumerSessionClosed(this);
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }