Take advantage of default methods in DOMRpcProviderService
[controller.git] / opendaylight / md-sal / sal-dom-broker / src / main / java / org / opendaylight / controller / sal / dom / broker / BrokerImpl.java
index 24b2b4b6daa4e4d47dd40ce85d6b17f0a96be098..40a4efd6fea3d89c563cfb73cc842521ba4d3745 100644 (file)
  */
 package org.opendaylight.controller.sal.dom.broker;
 
-import java.util.Collection;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ClassToInstanceMap;
+import com.google.common.collect.ImmutableClassToInstanceMap;
+import com.google.common.util.concurrent.CheckedFuture;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcAvailabilityListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcException;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcIdentifier;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementation;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcImplementationRegistration;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcProviderService;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcResult;
+import org.opendaylight.controller.md.sal.dom.api.DOMRpcService;
+import org.opendaylight.controller.md.sal.dom.broker.impl.DOMRpcRouter;
 import org.opendaylight.controller.sal.core.api.Broker;
 import org.opendaylight.controller.sal.core.api.BrokerService;
 import org.opendaylight.controller.sal.core.api.Consumer;
 import org.opendaylight.controller.sal.core.api.Provider;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.spi.BrokerModule;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaPath;
 import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BrokerImpl implements Broker {
-    private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);
+public class BrokerImpl implements Broker, DOMRpcProviderService, DOMRpcService, AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(BrokerImpl.class);
 
     // Broker Generic Context
-    private Set<ConsumerSessionImpl> sessions = Collections
-            .synchronizedSet(new HashSet<ConsumerSessionImpl>());
-    private Set<ProviderSessionImpl> providerSessions = Collections
-            .synchronizedSet(new HashSet<ProviderSessionImpl>());
-    private Set<BrokerModule> modules = Collections
-            .synchronizedSet(new HashSet<BrokerModule>());
-    private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections
-            .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
+    private final Set<ConsumerContextImpl> sessions = Collections
+            .synchronizedSet(new HashSet<ConsumerContextImpl>());
+    private final Set<ProviderContextImpl> providerSessions = Collections
+            .synchronizedSet(new HashSet<ProviderContextImpl>());
 
-    // RPC Context
-    private Map<QName, RpcImplementation> rpcImpls = Collections
-            .synchronizedMap(new HashMap<QName, RpcImplementation>());
+    private AutoCloseable deactivator;
 
-    // Implementation specific
-    private ExecutorService executor;
+    private final DOMRpcService rpcService;
+    private final DOMRpcProviderService rpcProvider;
 
-    private BundleContext bundleContext;
+    private final ClassToInstanceMap<BrokerService> services;
 
-    @Override
-    public ConsumerSession registerConsumer(Consumer consumer,BundleContext ctx) {
-        checkPredicates(consumer);
-        log.info("Registering consumer " + consumer);
-        ConsumerSessionImpl session = newSessionFor(consumer,ctx);
-        consumer.onSessionInitiated(session);
-        sessions.add(session);
-        return session;
+    public BrokerImpl(final DOMRpcRouter router,final ClassToInstanceMap<BrokerService> services) {
+        this(router, router, services);
     }
 
-    @Override
-    public ProviderSession registerProvider(Provider provider,BundleContext ctx) {
-        checkPredicates(provider);
-
-        ProviderSessionImpl session = newSessionFor(provider,ctx);
-        provider.onSessionInitiated(session);
-        providerSessions.add(session);
-        return session;
+    public BrokerImpl(final DOMRpcService rpcService, final DOMRpcProviderService rpcProvider,
+            final ClassToInstanceMap<BrokerService> services) {
+        this.rpcService = Preconditions.checkNotNull(rpcService, "DOMRpcService must not be null");
+        this.rpcProvider = Preconditions.checkNotNull(rpcProvider, "DOMRpcProviderService must not be null");
+        this.services = ImmutableClassToInstanceMap.copyOf(services);
     }
 
-    public void addModule(BrokerModule module) {
-        log.info("Registering broker module " + module);
-        if (modules.contains(module)) {
-            log.error("Module already registered");
-            throw new IllegalArgumentException("Module already exists.");
-        }
-    
-        Set<Class<? extends BrokerService>> provServices = module
-                .getProvidedServices();
-        for (Class<? extends BrokerService> serviceType : provServices) {
-            log.info("  Registering session service implementation: "
-                    + serviceType.getCanonicalName());
-            serviceProviders.put(serviceType, module);
-        }
+    @Override
+    public ConsumerSession registerConsumer(final Consumer consumer,
+            final BundleContext ctx) {
+        return registerConsumer(consumer);
     }
 
-    public <T extends BrokerService> T serviceFor(Class<T> service,
-            ConsumerSessionImpl session) {
-        BrokerModule prov = serviceProviders.get(service);
-        if (prov == null) {
-            log.warn("Service " + service.toString() + " is not supported");
-            return null;
-        }
-        return prov.getServiceForSession(service, session);
+    @Override
+    public ProviderSession registerProvider(final Provider provider,
+            final BundleContext ctx) {
+        return registerProvider(provider);
     }
 
-    // RPC Functionality
-    
-    private void addRpcImplementation(QName rpcType,
-            RpcImplementation implementation) {
-        synchronized (rpcImpls) {
-            if (rpcImpls.get(rpcType) != null) {
-                throw new IllegalStateException("Implementation for rpc "
-                        + rpcType + " is already registered.");
+    // Validation
+    private void checkPredicates(final Provider prov) {
+        Preconditions.checkNotNull(prov, "Provider should not be null.");
+        for (final ProviderContextImpl session : providerSessions) {
+            if (prov.equals(session.getProvider())) {
+                throw new IllegalStateException("Provider already registered");
             }
-            rpcImpls.put(rpcType, implementation);
         }
-        // TODO Add notification for availability of Rpc Implementation
+
     }
 
-    private void removeRpcImplementation(QName rpcType,
-            RpcImplementation implToRemove) {
-        synchronized (rpcImpls) {
-            if (implToRemove == rpcImpls.get(rpcType)) {
-                rpcImpls.remove(rpcType);
+    private void checkPredicates(final Consumer cons) {
+        Preconditions.checkNotNull(cons, "Consumer should not be null.");
+        for (final ConsumerContextImpl session : sessions) {
+            if (cons.equals(session.getConsumer())) {
+                throw new IllegalStateException("Consumer already registered");
             }
         }
-        // TODO Add notification for removal of Rpc Implementation
     }
 
-    private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,
-            CompositeNode input) {
-        RpcImplementation impl = rpcImpls.get(rpc);
-        // if()
-
-        Callable<RpcResult<CompositeNode>> call = callableFor(impl,
-                rpc, input);
-        Future<RpcResult<CompositeNode>> result = executor.submit(call);
-
-        return result;
+    // Private Factory methods
+    private ConsumerContextImpl newSessionFor(final Consumer provider) {
+        final ConsumerContextImpl ret = new ConsumerContextImpl(provider, this);
+        return ret;
     }
-    
-    // Validation
 
-    private void checkPredicates(Provider prov) {
-        if (prov == null)
-            throw new IllegalArgumentException("Provider should not be null.");
-        for (ProviderSessionImpl session : providerSessions) {
-            if (prov.equals(session.getProvider()))
-                throw new IllegalStateException("Provider already registered");
-        }
+    private ProviderContextImpl newSessionFor(final Provider provider) {
+        final ProviderContextImpl ret = new ProviderContextImpl(provider, this);
+        return ret;
+    }
 
+    protected void consumerSessionClosed(
+            final ConsumerContextImpl consumerContextImpl) {
+        sessions.remove(consumerContextImpl);
+        providerSessions.remove(consumerContextImpl);
     }
 
-    private void checkPredicates(Consumer cons) {
-        if (cons == null)
-            throw new IllegalArgumentException("Consumer should not be null.");
-        for (ConsumerSessionImpl session : sessions) {
-            if (cons.equals(session.getConsumer()))
-                throw new IllegalStateException("Consumer already registered");
+    @Override
+    public void close() throws Exception {
+        if (deactivator != null) {
+            deactivator.close();
+            deactivator = null;
         }
     }
 
-    // Private Factory methods
-    
-    private ConsumerSessionImpl newSessionFor(Consumer provider, BundleContext ctx) {
-        return new ConsumerSessionImpl(provider,ctx);
+    /**
+     * @return the deactivator
+     */
+    public AutoCloseable getDeactivator() {
+        return deactivator;
     }
 
-    private ProviderSessionImpl newSessionFor(Provider provider, BundleContext ctx) {
-        return new ProviderSessionImpl(provider,ctx);
+    /**
+     * @param deactivator
+     *            the deactivator to set
+     */
+    public void setDeactivator(final AutoCloseable deactivator) {
+        this.deactivator = deactivator;
     }
 
-    private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {
-        sessions.remove(consumerSessionImpl);
-        providerSessions.remove(consumerSessionImpl);
+    protected <T extends BrokerService> Optional<T> getGlobalService(final Class<T> service) {
+        return Optional.fromNullable(services.getInstance(service));
     }
 
-    private static Callable<RpcResult<CompositeNode>> callableFor(
-            final RpcImplementation implemenation, final QName rpc,
-            final CompositeNode input) {
-
-        return new Callable<RpcResult<CompositeNode>>() {
 
-            @Override
-            public RpcResult<CompositeNode> call() throws Exception {
-                return implemenation.invokeRpc(rpc, input);
-            }
-        };
+    @Override
+    public ConsumerSession registerConsumer(final Consumer consumer) {
+        checkPredicates(consumer);
+        LOG.trace("Registering consumer {}", consumer);
+        final ConsumerContextImpl session = newSessionFor(consumer);
+        consumer.onSessionInitiated(session);
+        sessions.add(session);
+        return session;
     }
-    
-    private class ConsumerSessionImpl implements ConsumerSession {
-
-        private final Consumer consumer;
 
-        private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections
-                .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());
-        private boolean closed = false;
-
-        private BundleContext context;
-
-        public Consumer getConsumer() {
-            return consumer;
-        }
-
-        public ConsumerSessionImpl(Consumer consumer, BundleContext ctx) {
-            this.consumer = consumer;
-            this.context = ctx;
-        }
-
-        @Override
-        public Future<RpcResult<CompositeNode>> rpc(QName rpc,
-                CompositeNode input) {
-            return BrokerImpl.this.invokeRpc(rpc, input);
-        }
-
-        @Override
-        public <T extends BrokerService> T getService(Class<T> service) {
-            BrokerService potential = instantiatedServices.get(service);
-            if (potential != null) {
-                @SuppressWarnings("unchecked")
-                T ret = (T) potential;
-                return ret;
-            }
-            T ret = BrokerImpl.this.serviceFor(service, this);
-            if (ret != null) {
-                instantiatedServices.put(service, ret);
-            }
-            return ret;
-        }
-
-        @Override
-        public void close() {
-            Collection<BrokerService> toStop = instantiatedServices.values();
-            this.closed = true;
-            for (BrokerService brokerService : toStop) {
-                brokerService.closeSession();
-            }
-            BrokerImpl.this.consumerSessionClosed(this);
-        }
-
-        @Override
-        public boolean isClosed() {
-            return closed;
-        }
 
+    @Override
+    public ProviderSession registerProvider(final Provider provider) {
+        checkPredicates(provider);
+        final ProviderContextImpl session = newSessionFor(provider);
+        provider.onSessionInitiated(session);
+        providerSessions.add(session);
+        return session;
     }
 
-    private class ProviderSessionImpl extends ConsumerSessionImpl implements
-            ProviderSession {
-
-        private Provider provider;
-        private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());
-
-        public ProviderSessionImpl(Provider provider, BundleContext ctx) {
-            super(null,ctx);
-            this.provider = provider;
-        }
-
-        @Override
-        public void addRpcImplementation(QName rpcType,
-                RpcImplementation implementation)
-                throws IllegalArgumentException {
-            if (rpcType == null) {
-                throw new IllegalArgumentException("rpcType must not be null");
-            }
-            if (implementation == null) {
-                throw new IllegalArgumentException(
-                        "Implementation must not be null");
-            }
-            BrokerImpl.this.addRpcImplementation(rpcType, implementation);
-            sessionRpcImpls.put(rpcType, implementation);
-        }
-
-        @Override
-        public void removeRpcImplementation(QName rpcType,
-                RpcImplementation implToRemove) throws IllegalArgumentException {
-            RpcImplementation localImpl = rpcImpls.get(rpcType);
-            if (localImpl != implToRemove) {
-                throw new IllegalStateException(
-                        "Implementation was not registered in this session");
-            }
-
-            BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);
-            sessionRpcImpls.remove(rpcType);
-        }
-
-        public Provider getProvider() {
-            return this.provider;
-        }
+    @Nonnull
+    @Override
+    public <T extends DOMRpcImplementation> DOMRpcImplementationRegistration<T> registerRpcImplementation(@Nonnull final T implementation, @Nonnull final Set<DOMRpcIdentifier> rpcs) {
+        return rpcProvider.registerRpcImplementation(implementation, rpcs);
+    }
 
+    @Nonnull
+    @Override
+    public CheckedFuture<DOMRpcResult, DOMRpcException> invokeRpc(@Nonnull final SchemaPath type, @Nullable final NormalizedNode<?, ?> input) {
+        return rpcService.invokeRpc(type, input);
     }
 
-    public void setBundleContext(BundleContext context) {
-        this.bundleContext = context;
+    @Nonnull
+    @Override
+    public <T extends DOMRpcAvailabilityListener> ListenerRegistration<T> registerRpcListener(@Nonnull final T listener) {
+        return rpcService.registerRpcListener(listener);
     }
 }