*/
package org.opendaylight.controller.sal.dom.broker;
-import java.util.Collection;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ClassToInstanceMap;
+import com.google.common.collect.ImmutableClassToInstanceMap;
+import com.google.common.util.concurrent.CheckedFuture;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMRpcRouter;
import org.opendaylight.controller.sal.core.api.Broker;
import org.opendaylight.controller.sal.core.api.BrokerService;
import org.opendaylight.controller.sal.core.api.Consumer;
import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.spi.BrokerModule;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BrokerImpl implements Broker {
- private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);
+public class BrokerImpl implements Broker, DOMRpcProviderService, DOMRpcService, AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(BrokerImpl.class);
// Broker Generic Context
- private Set<ConsumerSessionImpl> sessions = Collections
- .synchronizedSet(new HashSet<ConsumerSessionImpl>());
- private Set<ProviderSessionImpl> providerSessions = Collections
- .synchronizedSet(new HashSet<ProviderSessionImpl>());
- private Set<BrokerModule> modules = Collections
- .synchronizedSet(new HashSet<BrokerModule>());
- private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections
- .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
+ private final Set<ConsumerContextImpl> sessions = Collections
+ .synchronizedSet(new HashSet<ConsumerContextImpl>());
+ private final Set<ProviderContextImpl> providerSessions = Collections
+ .synchronizedSet(new HashSet<ProviderContextImpl>());
- // RPC Context
- private Map<QName, RpcImplementation> rpcImpls = Collections
- .synchronizedMap(new HashMap<QName, RpcImplementation>());
+ private AutoCloseable deactivator;
- // Implementation specific
- private ExecutorService executor;
+ private final DOMRpcService rpcService;
+ private final DOMRpcProviderService rpcProvider;
- private BundleContext bundleContext;
+ private final ClassToInstanceMap<BrokerService> services;
- @Override
- public ConsumerSession registerConsumer(Consumer consumer,BundleContext ctx) {
- checkPredicates(consumer);
- log.info("Registering consumer " + consumer);
- ConsumerSessionImpl session = newSessionFor(consumer,ctx);
- consumer.onSessionInitiated(session);
- sessions.add(session);
- return session;
+ public BrokerImpl(final DOMRpcRouter router,final ClassToInstanceMap<BrokerService> services) {
+ this(router, router, services);
}
- @Override
- public ProviderSession registerProvider(Provider provider,BundleContext ctx) {
- checkPredicates(provider);
-
- ProviderSessionImpl session = newSessionFor(provider,ctx);
- provider.onSessionInitiated(session);
- providerSessions.add(session);
- return session;
+ public BrokerImpl(final DOMRpcService rpcService, final DOMRpcProviderService rpcProvider,
+ final ClassToInstanceMap<BrokerService> services) {
+ this.rpcService = Preconditions.checkNotNull(rpcService, "DOMRpcService must not be null");
+ this.rpcProvider = Preconditions.checkNotNull(rpcProvider, "DOMRpcProviderService must not be null");
+ this.services = ImmutableClassToInstanceMap.copyOf(services);
}
- public void addModule(BrokerModule module) {
- log.info("Registering broker module " + module);
- if (modules.contains(module)) {
- log.error("Module already registered");
- throw new IllegalArgumentException("Module already exists.");
- }
-
- Set<Class<? extends BrokerService>> provServices = module
- .getProvidedServices();
- for (Class<? extends BrokerService> serviceType : provServices) {
- log.info(" Registering session service implementation: "
- + serviceType.getCanonicalName());
- serviceProviders.put(serviceType, module);
- }
+ @Override
+ public ConsumerSession registerConsumer(final Consumer consumer,
+ final BundleContext ctx) {
+ return registerConsumer(consumer);
}
- public <T extends BrokerService> T serviceFor(Class<T> service,
- ConsumerSessionImpl session) {
- BrokerModule prov = serviceProviders.get(service);
- if (prov == null) {
- log.warn("Service " + service.toString() + " is not supported");
- return null;
- }
- return prov.getServiceForSession(service, session);
+ @Override
+ public ProviderSession registerProvider(final Provider provider,
+ final BundleContext ctx) {
+ return registerProvider(provider);
}
- // RPC Functionality
-
- private void addRpcImplementation(QName rpcType,
- RpcImplementation implementation) {
- synchronized (rpcImpls) {
- if (rpcImpls.get(rpcType) != null) {
- throw new IllegalStateException("Implementation for rpc "
- + rpcType + " is already registered.");
+ // Validation
+ private void checkPredicates(final Provider prov) {
+ Preconditions.checkNotNull(prov, "Provider should not be null.");
+ for (final ProviderContextImpl session : providerSessions) {
+ if (prov.equals(session.getProvider())) {
+ throw new IllegalStateException("Provider already registered");
}
- rpcImpls.put(rpcType, implementation);
}
- // TODO Add notification for availability of Rpc Implementation
+
}
- private void removeRpcImplementation(QName rpcType,
- RpcImplementation implToRemove) {
- synchronized (rpcImpls) {
- if (implToRemove == rpcImpls.get(rpcType)) {
- rpcImpls.remove(rpcType);
+ private void checkPredicates(final Consumer cons) {
+ Preconditions.checkNotNull(cons, "Consumer should not be null.");
+ for (final ConsumerContextImpl session : sessions) {
+ if (cons.equals(session.getConsumer())) {
+ throw new IllegalStateException("Consumer already registered");
}
}
- // TODO Add notification for removal of Rpc Implementation
}
- private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,
- CompositeNode input) {
- RpcImplementation impl = rpcImpls.get(rpc);
- // if()
-
- Callable<RpcResult<CompositeNode>> call = callableFor(impl,
- rpc, input);
- Future<RpcResult<CompositeNode>> result = executor.submit(call);
-
- return result;
+ // Private Factory methods
+ private ConsumerContextImpl newSessionFor(final Consumer provider) {
+ final ConsumerContextImpl ret = new ConsumerContextImpl(provider, this);
+ return ret;
}
-
- // Validation
- private void checkPredicates(Provider prov) {
- if (prov == null)
- throw new IllegalArgumentException("Provider should not be null.");
- for (ProviderSessionImpl session : providerSessions) {
- if (prov.equals(session.getProvider()))
- throw new IllegalStateException("Provider already registered");
- }
+ private ProviderContextImpl newSessionFor(final Provider provider) {
+ final ProviderContextImpl ret = new ProviderContextImpl(provider, this);
+ return ret;
+ }
+ protected void consumerSessionClosed(
+ final ConsumerContextImpl consumerContextImpl) {
+ sessions.remove(consumerContextImpl);
+ providerSessions.remove(consumerContextImpl);
}
- private void checkPredicates(Consumer cons) {
- if (cons == null)
- throw new IllegalArgumentException("Consumer should not be null.");
- for (ConsumerSessionImpl session : sessions) {
- if (cons.equals(session.getConsumer()))
- throw new IllegalStateException("Consumer already registered");
+ @Override
+ public void close() throws Exception {
+ if (deactivator != null) {
+ deactivator.close();
+ deactivator = null;
}
}
- // Private Factory methods
-
- private ConsumerSessionImpl newSessionFor(Consumer provider, BundleContext ctx) {
- return new ConsumerSessionImpl(provider,ctx);
+ /**
+ * @return the deactivator
+ */
+ public AutoCloseable getDeactivator() {
+ return deactivator;
}
- private ProviderSessionImpl newSessionFor(Provider provider, BundleContext ctx) {
- return new ProviderSessionImpl(provider,ctx);
+ /**
+ * @param deactivator
+ * the deactivator to set
+ */
+ public void setDeactivator(final AutoCloseable deactivator) {
+ this.deactivator = deactivator;
}
- private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {
- sessions.remove(consumerSessionImpl);
- providerSessions.remove(consumerSessionImpl);
+ protected <T extends BrokerService> Optional<T> getGlobalService(final Class<T> service) {
+ return Optional.fromNullable(services.getInstance(service));
}
- private static Callable<RpcResult<CompositeNode>> callableFor(
- final RpcImplementation implemenation, final QName rpc,
- final CompositeNode input) {
-
- return new Callable<RpcResult<CompositeNode>>() {
- @Override
- public RpcResult<CompositeNode> call() throws Exception {
- return implemenation.invokeRpc(rpc, input);
- }
- };
+ @Override
+ public ConsumerSession registerConsumer(final Consumer consumer) {
+ checkPredicates(consumer);
+ LOG.trace("Registering consumer {}", consumer);
+ final ConsumerContextImpl session = newSessionFor(consumer);
+ consumer.onSessionInitiated(session);
+ sessions.add(session);
+ return session;
}
-
- private class ConsumerSessionImpl implements ConsumerSession {
-
- private final Consumer consumer;
- private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections
- .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());
- private boolean closed = false;
-
- private BundleContext context;
-
- public Consumer getConsumer() {
- return consumer;
- }
-
- public ConsumerSessionImpl(Consumer consumer, BundleContext ctx) {
- this.consumer = consumer;
- this.context = ctx;
- }
-
- @Override
- public Future<RpcResult<CompositeNode>> rpc(QName rpc,
- CompositeNode input) {
- return BrokerImpl.this.invokeRpc(rpc, input);
- }
-
- @Override
- public <T extends BrokerService> T getService(Class<T> service) {
- BrokerService potential = instantiatedServices.get(service);
- if (potential != null) {
- @SuppressWarnings("unchecked")
- T ret = (T) potential;
- return ret;
- }
- T ret = BrokerImpl.this.serviceFor(service, this);
- if (ret != null) {
- instantiatedServices.put(service, ret);
- }
- return ret;
- }
-
- @Override
- public void close() {
- Collection<BrokerService> toStop = instantiatedServices.values();
- this.closed = true;
- for (BrokerService brokerService : toStop) {
- brokerService.closeSession();
- }
- BrokerImpl.this.consumerSessionClosed(this);
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }
+ @Override
+ public ProviderSession registerProvider(final Provider provider) {
+ checkPredicates(provider);
+ final ProviderContextImpl session = newSessionFor(provider);
+ provider.onSessionInitiated(session);
+ providerSessions.add(session);
+ return session;
}
- private class ProviderSessionImpl extends ConsumerSessionImpl implements
- ProviderSession {
-
- private Provider provider;
- private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());
-
- public ProviderSessionImpl(Provider provider, BundleContext ctx) {
- super(null,ctx);
- this.provider = provider;
- }
-
- @Override
- public void addRpcImplementation(QName rpcType,
- RpcImplementation implementation)
- throws IllegalArgumentException {
- if (rpcType == null) {
- throw new IllegalArgumentException("rpcType must not be null");
- }
- if (implementation == null) {
- throw new IllegalArgumentException(
- "Implementation must not be null");
- }
- BrokerImpl.this.addRpcImplementation(rpcType, implementation);
- sessionRpcImpls.put(rpcType, implementation);
- }
-
- @Override
- public void removeRpcImplementation(QName rpcType,
- RpcImplementation implToRemove) throws IllegalArgumentException {
- RpcImplementation localImpl = rpcImpls.get(rpcType);
- if (localImpl != implToRemove) {
- throw new IllegalStateException(
- "Implementation was not registered in this session");
- }
-
- BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);
- sessionRpcImpls.remove(rpcType);
- }
-
- public Provider getProvider() {
- return this.provider;
- }
+ @Nonnull
+ @Override
+ public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(@Nonnull final T implementation, @Nonnull final Set<DOMRpcIdentifier> rpcs) {
+ return rpcProvider.registerRpcImplementation(implementation, rpcs);
+ }
+ @Nonnull
+ @Override
+ public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull final SchemaPath type, @Nullable final NormalizedNode<?, ?> input) {
+ return rpcService.invokeRpc(type, input);
}
- public void setBundleContext(BundleContext context) {
- this.bundleContext = context;
+ @Nonnull
+ @Override
+ public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(@Nonnull final T listener) {
+ return rpcService.registerRpcListener(listener);
}
}