Clean up of binding broker implementation
[controller.git] / opendaylight / sal / yang-prototype / sal / sal-broker-impl / src / main / java / org / opendaylight / controller / sal / core / impl / BrokerImpl.java
index b8a0b97eab9abb81b851eeda349ec6156709442a..f0d1cc609d22a54230bed5c2136ae2f32ba701e7 100644 (file)
-/*\r
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.\r
- *\r
- * This program and the accompanying materials are made available under the\r
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,\r
- * and is available at http://www.eclipse.org/legal/epl-v10.html\r
- */\r
-package org.opendaylight.controller.sal.core.impl;\r
-\r
-import java.util.Collection;\r
-import java.util.Collections;\r
-import java.util.HashMap;\r
-import java.util.HashSet;\r
-import java.util.Map;\r
-import java.util.Set;\r
-import java.util.concurrent.Callable;\r
-import java.util.concurrent.ExecutorService;\r
-import java.util.concurrent.Future;\r
-import org.opendaylight.controller.sal.core.api.Broker;\r
-import org.opendaylight.controller.sal.core.api.BrokerService;\r
-import org.opendaylight.controller.sal.core.api.Consumer;\r
-import org.opendaylight.controller.sal.core.api.Provider;\r
-import org.opendaylight.controller.sal.core.api.RpcImplementation;\r
-import org.opendaylight.controller.sal.core.spi.BrokerModule;\r
-import org.opendaylight.controller.yang.common.QName;\r
-import org.opendaylight.controller.yang.common.RpcResult;\r
-import org.opendaylight.controller.yang.data.api.CompositeNode;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-\r
-public class BrokerImpl implements Broker {\r
-    private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);\r
-\r
-    // Broker Generic Context\r
-    private Set<ConsumerSessionImpl> sessions = Collections\r
-            .synchronizedSet(new HashSet<ConsumerSessionImpl>());\r
-    private Set<ProviderSessionImpl> providerSessions = Collections\r
-            .synchronizedSet(new HashSet<ProviderSessionImpl>());\r
-    private Set<BrokerModule> modules = Collections\r
-            .synchronizedSet(new HashSet<BrokerModule>());\r
-    private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections\r
-            .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());\r
-\r
-    // RPC Context\r
-    private Map<QName, RpcImplementation> rpcImpls = Collections\r
-            .synchronizedMap(new HashMap<QName, RpcImplementation>());\r
-\r
-    // Implementation specific\r
-    private ExecutorService executor;\r
-\r
-    @Override\r
-    public ConsumerSession registerConsumer(Consumer consumer) {\r
-        checkPredicates(consumer);\r
-        log.info("Registering consumer " + consumer);\r
-        ConsumerSessionImpl session = newSessionFor(consumer);\r
-        consumer.onSessionInitiated(session);\r
-        sessions.add(session);\r
-        return session;\r
-    }\r
-\r
-    @Override\r
-    public ProviderSession registerProvider(Provider provider) {\r
-        checkPredicates(provider);\r
-\r
-        ProviderSessionImpl session = newSessionFor(provider);\r
-        provider.onSessionInitiated(session);\r
-        providerSessions.add(session);\r
-        return session;\r
-    }\r
-\r
-    public void addModule(BrokerModule module) {\r
-        log.info("Registering broker module " + module);\r
-        if (modules.contains(module)) {\r
-            log.error("Module already registered");\r
-            throw new IllegalArgumentException("Module already exists.");\r
-        }\r
-    \r
-        Set<Class<? extends BrokerService>> provServices = module\r
-                .getProvidedServices();\r
-        for (Class<? extends BrokerService> serviceType : provServices) {\r
-            log.info("  Registering session service implementation: "\r
-                    + serviceType.getCanonicalName());\r
-            serviceProviders.put(serviceType, module);\r
-        }\r
-    }\r
-\r
-    public <T extends BrokerService> T serviceFor(Class<T> service,\r
-            ConsumerSessionImpl session) {\r
-        BrokerModule prov = serviceProviders.get(service);\r
-        if (prov == null) {\r
-            log.warn("Service " + service.toString() + " is not supported");\r
-            return null;\r
-        }\r
-        return prov.getServiceForSession(service, session);\r
-    }\r
-\r
-    // RPC Functionality\r
-    \r
-    private void addRpcImplementation(QName rpcType,\r
-            RpcImplementation implementation) {\r
-        synchronized (rpcImpls) {\r
-            if (rpcImpls.get(rpcType) != null) {\r
-                throw new IllegalStateException("Implementation for rpc "\r
-                        + rpcType + " is already registered.");\r
-            }\r
-            rpcImpls.put(rpcType, implementation);\r
-        }\r
-        // TODO Add notification for availability of Rpc Implementation\r
-    }\r
-\r
-    private void removeRpcImplementation(QName rpcType,\r
-            RpcImplementation implToRemove) {\r
-        synchronized (rpcImpls) {\r
-            if (implToRemove == rpcImpls.get(rpcType)) {\r
-                rpcImpls.remove(rpcType);\r
-            }\r
-        }\r
-        // TODO Add notification for removal of Rpc Implementation\r
-    }\r
-\r
-    private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,\r
-            CompositeNode input) {\r
-        RpcImplementation impl = rpcImpls.get(rpc);\r
-        // if()\r
-\r
-        Callable<RpcResult<CompositeNode>> call = callableFor(impl,\r
-                rpc, input);\r
-        Future<RpcResult<CompositeNode>> result = executor.submit(call);\r
-\r
-        return result;\r
-    }\r
-    \r
-    // Validation\r
-\r
-    private void checkPredicates(Provider prov) {\r
-        if (prov == null)\r
-            throw new IllegalArgumentException("Provider should not be null.");\r
-        for (ProviderSessionImpl session : providerSessions) {\r
-            if (prov.equals(session.getProvider()))\r
-                throw new IllegalStateException("Provider already registered");\r
-        }\r
-\r
-    }\r
-\r
-    private void checkPredicates(Consumer cons) {\r
-        if (cons == null)\r
-            throw new IllegalArgumentException("Consumer should not be null.");\r
-        for (ConsumerSessionImpl session : sessions) {\r
-            if (cons.equals(session.getConsumer()))\r
-                throw new IllegalStateException("Consumer already registered");\r
-        }\r
-    }\r
-\r
-    // Private Factory methods\r
-    \r
-    private ConsumerSessionImpl newSessionFor(Consumer cons) {\r
-        return new ConsumerSessionImpl(cons);\r
-    }\r
-\r
-    private ProviderSessionImpl newSessionFor(Provider provider) {\r
-        return new ProviderSessionImpl(provider);\r
-    }\r
-\r
-    private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {\r
-        sessions.remove(consumerSessionImpl);\r
-        providerSessions.remove(consumerSessionImpl);\r
-    }\r
-\r
-    private static Callable<RpcResult<CompositeNode>> callableFor(\r
-            final RpcImplementation implemenation, final QName rpc,\r
-            final CompositeNode input) {\r
-\r
-        return new Callable<RpcResult<CompositeNode>>() {\r
-\r
-            @Override\r
-            public RpcResult<CompositeNode> call() throws Exception {\r
-                return implemenation.invokeRpc(rpc, input);\r
-            }\r
-        };\r
-    }\r
-    \r
-    private class ConsumerSessionImpl implements ConsumerSession {\r
-\r
-        private final Consumer consumer;\r
-\r
-        private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections\r
-                .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());\r
-        private boolean closed = false;\r
-\r
-        public Consumer getConsumer() {\r
-            return consumer;\r
-        }\r
-\r
-        public ConsumerSessionImpl(Consumer consumer) {\r
-            this.consumer = consumer;\r
-        }\r
-\r
-        @Override\r
-        public Future<RpcResult<CompositeNode>> rpc(QName rpc,\r
-                CompositeNode input) {\r
-            return BrokerImpl.this.invokeRpc(rpc, input);\r
-        }\r
-\r
-        @Override\r
-        public <T extends BrokerService> T getService(Class<T> service) {\r
-            BrokerService potential = instantiatedServices.get(service);\r
-            if (potential != null) {\r
-                @SuppressWarnings("unchecked")\r
-                T ret = (T) potential;\r
-                return ret;\r
-            }\r
-            T ret = BrokerImpl.this.serviceFor(service, this);\r
-            if (ret != null) {\r
-                instantiatedServices.put(service, ret);\r
-            }\r
-            return ret;\r
-        }\r
-\r
-        @Override\r
-        public void close() {\r
-            Collection<BrokerService> toStop = instantiatedServices.values();\r
-            this.closed = true;\r
-            for (BrokerService brokerService : toStop) {\r
-                brokerService.closeSession();\r
-            }\r
-            BrokerImpl.this.consumerSessionClosed(this);\r
-        }\r
-\r
-        @Override\r
-        public boolean isClosed() {\r
-            return closed;\r
-        }\r
-\r
-    }\r
-\r
-    private class ProviderSessionImpl extends ConsumerSessionImpl implements\r
-            ProviderSession {\r
-\r
-        private Provider provider;\r
-        private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());\r
-\r
-        public ProviderSessionImpl(Provider provider) {\r
-            super(null);\r
-            this.provider = provider;\r
-        }\r
-\r
-        @Override\r
-        public void addRpcImplementation(QName rpcType,\r
-                RpcImplementation implementation)\r
-                throws IllegalArgumentException {\r
-            if (rpcType == null) {\r
-                throw new IllegalArgumentException("rpcType must not be null");\r
-            }\r
-            if (implementation == null) {\r
-                throw new IllegalArgumentException(\r
-                        "Implementation must not be null");\r
-            }\r
-            BrokerImpl.this.addRpcImplementation(rpcType, implementation);\r
-            sessionRpcImpls.put(rpcType, implementation);\r
-        }\r
-\r
-        @Override\r
-        public void removeRpcImplementation(QName rpcType,\r
-                RpcImplementation implToRemove) throws IllegalArgumentException {\r
-            RpcImplementation localImpl = rpcImpls.get(rpcType);\r
-            if (localImpl != implToRemove) {\r
-                throw new IllegalStateException(\r
-                        "Implementation was not registered in this session");\r
-            }\r
-\r
-            BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);\r
-            sessionRpcImpls.remove(rpcType);\r
-        }\r
-\r
-        public Provider getProvider() {\r
-            return this.provider;\r
-        }\r
-\r
-    }\r
-}\r
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.sal.core.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.BrokerService;
+import org.opendaylight.controller.sal.core.api.Consumer;
+import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.spi.BrokerModule;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BrokerImpl implements Broker {
+    private static Logger log = LoggerFactory.getLogger(BrokerImpl.class);
+
+    // Broker Generic Context
+    private Set<ConsumerSessionImpl> sessions = Collections
+            .synchronizedSet(new HashSet<ConsumerSessionImpl>());
+    private Set<ProviderSessionImpl> providerSessions = Collections
+            .synchronizedSet(new HashSet<ProviderSessionImpl>());
+    private Set<BrokerModule> modules = Collections
+            .synchronizedSet(new HashSet<BrokerModule>());
+    private Map<Class<? extends BrokerService>, BrokerModule> serviceProviders = Collections
+            .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerModule>());
+
+    // RPC Context
+    private Map<QName, RpcImplementation> rpcImpls = Collections
+            .synchronizedMap(new HashMap<QName, RpcImplementation>());
+
+    // Implementation specific
+    private ExecutorService executor;
+
+    @Override
+    public ConsumerSession registerConsumer(Consumer consumer) {
+        checkPredicates(consumer);
+        log.info("Registering consumer " + consumer);
+        ConsumerSessionImpl session = newSessionFor(consumer);
+        consumer.onSessionInitiated(session);
+        sessions.add(session);
+        return session;
+    }
+
+    @Override
+    public ProviderSession registerProvider(Provider provider) {
+        checkPredicates(provider);
+
+        ProviderSessionImpl session = newSessionFor(provider);
+        provider.onSessionInitiated(session);
+        providerSessions.add(session);
+        return session;
+    }
+
+    public void addModule(BrokerModule module) {
+        log.info("Registering broker module " + module);
+        if (modules.contains(module)) {
+            log.error("Module already registered");
+            throw new IllegalArgumentException("Module already exists.");
+        }
+    
+        Set<Class<? extends BrokerService>> provServices = module
+                .getProvidedServices();
+        for (Class<? extends BrokerService> serviceType : provServices) {
+            log.info("  Registering session service implementation: "
+                    + serviceType.getCanonicalName());
+            serviceProviders.put(serviceType, module);
+        }
+    }
+
+    public <T extends BrokerService> T serviceFor(Class<T> service,
+            ConsumerSessionImpl session) {
+        BrokerModule prov = serviceProviders.get(service);
+        if (prov == null) {
+            log.warn("Service " + service.toString() + " is not supported");
+            return null;
+        }
+        return prov.getServiceForSession(service, session);
+    }
+
+    // RPC Functionality
+    
+    private void addRpcImplementation(QName rpcType,
+            RpcImplementation implementation) {
+        synchronized (rpcImpls) {
+            if (rpcImpls.get(rpcType) != null) {
+                throw new IllegalStateException("Implementation for rpc "
+                        + rpcType + " is already registered.");
+            }
+            rpcImpls.put(rpcType, implementation);
+        }
+        // TODO Add notification for availability of Rpc Implementation
+    }
+
+    private void removeRpcImplementation(QName rpcType,
+            RpcImplementation implToRemove) {
+        synchronized (rpcImpls) {
+            if (implToRemove == rpcImpls.get(rpcType)) {
+                rpcImpls.remove(rpcType);
+            }
+        }
+        // TODO Add notification for removal of Rpc Implementation
+    }
+
+    private Future<RpcResult<CompositeNode>> invokeRpc(QName rpc,
+            CompositeNode input) {
+        RpcImplementation impl = rpcImpls.get(rpc);
+        // if()
+
+        Callable<RpcResult<CompositeNode>> call = callableFor(impl,
+                rpc, input);
+        Future<RpcResult<CompositeNode>> result = executor.submit(call);
+
+        return result;
+    }
+    
+    // Validation
+
+    private void checkPredicates(Provider prov) {
+        if (prov == null)
+            throw new IllegalArgumentException("Provider should not be null.");
+        for (ProviderSessionImpl session : providerSessions) {
+            if (prov.equals(session.getProvider()))
+                throw new IllegalStateException("Provider already registered");
+        }
+
+    }
+
+    private void checkPredicates(Consumer cons) {
+        if (cons == null)
+            throw new IllegalArgumentException("Consumer should not be null.");
+        for (ConsumerSessionImpl session : sessions) {
+            if (cons.equals(session.getConsumer()))
+                throw new IllegalStateException("Consumer already registered");
+        }
+    }
+
+    // Private Factory methods
+    
+    private ConsumerSessionImpl newSessionFor(Consumer cons) {
+        return new ConsumerSessionImpl(cons);
+    }
+
+    private ProviderSessionImpl newSessionFor(Provider provider) {
+        return new ProviderSessionImpl(provider);
+    }
+
+    private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) {
+        sessions.remove(consumerSessionImpl);
+        providerSessions.remove(consumerSessionImpl);
+    }
+
+    private static Callable<RpcResult<CompositeNode>> callableFor(
+            final RpcImplementation implemenation, final QName rpc,
+            final CompositeNode input) {
+
+        return new Callable<RpcResult<CompositeNode>>() {
+
+            @Override
+            public RpcResult<CompositeNode> call() throws Exception {
+                return implemenation.invokeRpc(rpc, input);
+            }
+        };
+    }
+    
+    private class ConsumerSessionImpl implements ConsumerSession {
+
+        private final Consumer consumer;
+
+        private Map<Class<? extends BrokerService>, BrokerService> instantiatedServices = Collections
+                .synchronizedMap(new HashMap<Class<? extends BrokerService>, BrokerService>());
+        private boolean closed = false;
+
+        public Consumer getConsumer() {
+            return consumer;
+        }
+
+        public ConsumerSessionImpl(Consumer consumer) {
+            this.consumer = consumer;
+        }
+
+        @Override
+        public Future<RpcResult<CompositeNode>> rpc(QName rpc,
+                CompositeNode input) {
+            return BrokerImpl.this.invokeRpc(rpc, input);
+        }
+
+        @Override
+        public <T extends BrokerService> T getService(Class<T> service) {
+            BrokerService potential = instantiatedServices.get(service);
+            if (potential != null) {
+                @SuppressWarnings("unchecked")
+                T ret = (T) potential;
+                return ret;
+            }
+            T ret = BrokerImpl.this.serviceFor(service, this);
+            if (ret != null) {
+                instantiatedServices.put(service, ret);
+            }
+            return ret;
+        }
+
+        @Override
+        public void close() {
+            Collection<BrokerService> toStop = instantiatedServices.values();
+            this.closed = true;
+            for (BrokerService brokerService : toStop) {
+                brokerService.closeSession();
+            }
+            BrokerImpl.this.consumerSessionClosed(this);
+        }
+
+        @Override
+        public boolean isClosed() {
+            return closed;
+        }
+
+    }
+
+    private class ProviderSessionImpl extends ConsumerSessionImpl implements
+            ProviderSession {
+
+        private Provider provider;
+        private Map<QName, RpcImplementation> sessionRpcImpls = Collections.synchronizedMap(new HashMap<QName, RpcImplementation>());
+
+        public ProviderSessionImpl(Provider provider) {
+            super(null);
+            this.provider = provider;
+        }
+
+        @Override
+        public void addRpcImplementation(QName rpcType,
+                RpcImplementation implementation)
+                throws IllegalArgumentException {
+            if (rpcType == null) {
+                throw new IllegalArgumentException("rpcType must not be null");
+            }
+            if (implementation == null) {
+                throw new IllegalArgumentException(
+                        "Implementation must not be null");
+            }
+            BrokerImpl.this.addRpcImplementation(rpcType, implementation);
+            sessionRpcImpls.put(rpcType, implementation);
+        }
+
+        @Override
+        public void removeRpcImplementation(QName rpcType,
+                RpcImplementation implToRemove) throws IllegalArgumentException {
+            RpcImplementation localImpl = rpcImpls.get(rpcType);
+            if (localImpl != implToRemove) {
+                throw new IllegalStateException(
+                        "Implementation was not registered in this session");
+            }
+
+            BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove);
+            sessionRpcImpls.remove(rpcType);
+        }
+
+        public Provider getProvider() {
+            return this.provider;
+        }
+
+    }
+}