X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fsal%2Fyang-prototype%2Fsal%2Fsal-broker-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fcore%2Fimpl%2FBrokerImpl.java;h=f0d1cc609d22a54230bed5c2136ae2f32ba701e7;hb=42183ad5bfefff7d6de9df467cdaa600a450af29;hp=84bc0569504272704d3ea68605d42f6c13de0afe;hpb=c8b79431119d6952b60a092e89727aa648a89bdd;p=controller.git diff --git a/opendaylight/sal/yang-prototype/sal/sal-broker-impl/src/main/java/org/opendaylight/controller/sal/core/impl/BrokerImpl.java b/opendaylight/sal/yang-prototype/sal/sal-broker-impl/src/main/java/org/opendaylight/controller/sal/core/impl/BrokerImpl.java index 84bc056950..f0d1cc609d 100644 --- a/opendaylight/sal/yang-prototype/sal/sal-broker-impl/src/main/java/org/opendaylight/controller/sal/core/impl/BrokerImpl.java +++ b/opendaylight/sal/yang-prototype/sal/sal-broker-impl/src/main/java/org/opendaylight/controller/sal/core/impl/BrokerImpl.java @@ -1,129 +1,280 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.sal.core.impl; -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.sal.core.impl; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Future; - -import org.opendaylight.controller.sal.core.api.Broker; -import org.opendaylight.controller.sal.core.api.BrokerService; -import org.opendaylight.controller.sal.core.api.Consumer; -import org.opendaylight.controller.sal.core.api.Provider; -import org.opendaylight.controller.sal.core.spi.BrokerModule; -import org.opendaylight.controller.yang.common.QName; -import org.opendaylight.controller.yang.common.RpcResult; -import org.opendaylight.controller.yang.data.api.CompositeNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class BrokerImpl implements Broker { - private static Logger log = LoggerFactory.getLogger(BrokerImpl.class); - - private Set sessions = new HashSet(); - private Set providerSessions = new HashSet(); - // private ExecutorService executor; - private Set modules = new HashSet(); - - private Map, BrokerModule> serviceProviders = new HashMap, BrokerModule>(); - - @Override - public ConsumerSession registerConsumer(Consumer consumer) { - checkPredicates(consumer); - log.info("Registering consumer " + consumer); - - ConsumerSessionImpl session = newSessionFor(consumer); - consumer.onSessionInitiated(session); - - sessions.add(session); - - return session; - - } - - @Override - public ProviderSession registerProvider(Provider provider) { - checkPredicates(provider); - - ProviderSessionImpl session = newSessionFor(provider); - provider.onSessionInitiated(session); - - providerSessions.add(session); - return session; - } - - public T serviceFor(Class service, - ConsumerSessionImpl session) { - BrokerModule prov = serviceProviders.get(service); - if (prov == null) { - log.warn("Service " + service.toString() + " is not supported"); - return null; - } - return prov.getServiceForSession(service, session); - } - - public Future> invokeRpc(QName rpc, - CompositeNode input) { - // TODO Implement this method - throw new UnsupportedOperationException("Not implemented"); - } - - private void checkPredicates(Provider prov) { - if (prov == null) - throw new IllegalArgumentException("Provider should not be null."); - for (ProviderSessionImpl session : providerSessions) { - if (prov.equals(session.getProvider())) - throw new IllegalStateException("Provider already registered"); - } - - } - - private void checkPredicates(Consumer cons) { - if (cons == null) - throw new IllegalArgumentException("Consumer should not be null."); - for (ConsumerSessionImpl session : sessions) { - if (cons.equals(session.getConsumer())) - throw new IllegalStateException("Consumer already registered"); - } - } - - private ConsumerSessionImpl newSessionFor(Consumer cons) { - return new ConsumerSessionImpl(this, cons); - } - - private ProviderSessionImpl newSessionFor(Provider provider) { - return new ProviderSessionImpl(this, provider); - } - - public void addModule(BrokerModule module) { - log.info("Registering broker module " + module); - if (modules.contains(module)) { - log.error("Module already registered"); - throw new IllegalArgumentException("Module already exists."); - } - - Set> provServices = module - .getProvidedServices(); - for (Class serviceType : provServices) { - log.info(" Registering session service implementation: " - + serviceType.getCanonicalName()); - serviceProviders.put(serviceType, module); - } - } - - public void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) { - sessions.remove(consumerSessionImpl); - providerSessions.remove(consumerSessionImpl); - } - -} +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import org.opendaylight.controller.sal.core.api.Broker; +import org.opendaylight.controller.sal.core.api.BrokerService; +import org.opendaylight.controller.sal.core.api.Consumer; +import org.opendaylight.controller.sal.core.api.Provider; +import org.opendaylight.controller.sal.core.api.RpcImplementation; +import org.opendaylight.controller.sal.core.spi.BrokerModule; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.common.RpcResult; +import org.opendaylight.yangtools.yang.data.api.CompositeNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +public class BrokerImpl implements Broker { + private static Logger log = LoggerFactory.getLogger(BrokerImpl.class); + + // Broker Generic Context + private Set sessions = Collections + .synchronizedSet(new HashSet()); + private Set providerSessions = Collections + .synchronizedSet(new HashSet()); + private Set modules = Collections + .synchronizedSet(new HashSet()); + private Map, BrokerModule> serviceProviders = Collections + .synchronizedMap(new HashMap, BrokerModule>()); + + // RPC Context + private Map rpcImpls = Collections + .synchronizedMap(new HashMap()); + + // Implementation specific + private ExecutorService executor; + + @Override + public ConsumerSession registerConsumer(Consumer consumer) { + checkPredicates(consumer); + log.info("Registering consumer " + consumer); + ConsumerSessionImpl session = newSessionFor(consumer); + consumer.onSessionInitiated(session); + sessions.add(session); + return session; + } + + @Override + public ProviderSession registerProvider(Provider provider) { + checkPredicates(provider); + + ProviderSessionImpl session = newSessionFor(provider); + provider.onSessionInitiated(session); + providerSessions.add(session); + return session; + } + + public void addModule(BrokerModule module) { + log.info("Registering broker module " + module); + if (modules.contains(module)) { + log.error("Module already registered"); + throw new IllegalArgumentException("Module already exists."); + } + + Set> provServices = module + .getProvidedServices(); + for (Class serviceType : provServices) { + log.info(" Registering session service implementation: " + + serviceType.getCanonicalName()); + serviceProviders.put(serviceType, module); + } + } + + public T serviceFor(Class service, + ConsumerSessionImpl session) { + BrokerModule prov = serviceProviders.get(service); + if (prov == null) { + log.warn("Service " + service.toString() + " is not supported"); + return null; + } + return prov.getServiceForSession(service, session); + } + + // RPC Functionality + + private void addRpcImplementation(QName rpcType, + RpcImplementation implementation) { + synchronized (rpcImpls) { + if (rpcImpls.get(rpcType) != null) { + throw new IllegalStateException("Implementation for rpc " + + rpcType + " is already registered."); + } + rpcImpls.put(rpcType, implementation); + } + // TODO Add notification for availability of Rpc Implementation + } + + private void removeRpcImplementation(QName rpcType, + RpcImplementation implToRemove) { + synchronized (rpcImpls) { + if (implToRemove == rpcImpls.get(rpcType)) { + rpcImpls.remove(rpcType); + } + } + // TODO Add notification for removal of Rpc Implementation + } + + private Future> invokeRpc(QName rpc, + CompositeNode input) { + RpcImplementation impl = rpcImpls.get(rpc); + // if() + + Callable> call = callableFor(impl, + rpc, input); + Future> result = executor.submit(call); + + return result; + } + + // Validation + + private void checkPredicates(Provider prov) { + if (prov == null) + throw new IllegalArgumentException("Provider should not be null."); + for (ProviderSessionImpl session : providerSessions) { + if (prov.equals(session.getProvider())) + throw new IllegalStateException("Provider already registered"); + } + + } + + private void checkPredicates(Consumer cons) { + if (cons == null) + throw new IllegalArgumentException("Consumer should not be null."); + for (ConsumerSessionImpl session : sessions) { + if (cons.equals(session.getConsumer())) + throw new IllegalStateException("Consumer already registered"); + } + } + + // Private Factory methods + + private ConsumerSessionImpl newSessionFor(Consumer cons) { + return new ConsumerSessionImpl(cons); + } + + private ProviderSessionImpl newSessionFor(Provider provider) { + return new ProviderSessionImpl(provider); + } + + private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) { + sessions.remove(consumerSessionImpl); + providerSessions.remove(consumerSessionImpl); + } + + private static Callable> callableFor( + final RpcImplementation implemenation, final QName rpc, + final CompositeNode input) { + + return new Callable>() { + + @Override + public RpcResult call() throws Exception { + return implemenation.invokeRpc(rpc, input); + } + }; + } + + private class ConsumerSessionImpl implements ConsumerSession { + + private final Consumer consumer; + + private Map, BrokerService> instantiatedServices = Collections + .synchronizedMap(new HashMap, BrokerService>()); + private boolean closed = false; + + public Consumer getConsumer() { + return consumer; + } + + public ConsumerSessionImpl(Consumer consumer) { + this.consumer = consumer; + } + + @Override + public Future> rpc(QName rpc, + CompositeNode input) { + return BrokerImpl.this.invokeRpc(rpc, input); + } + + @Override + public T getService(Class service) { + BrokerService potential = instantiatedServices.get(service); + if (potential != null) { + @SuppressWarnings("unchecked") + T ret = (T) potential; + return ret; + } + T ret = BrokerImpl.this.serviceFor(service, this); + if (ret != null) { + instantiatedServices.put(service, ret); + } + return ret; + } + + @Override + public void close() { + Collection toStop = instantiatedServices.values(); + this.closed = true; + for (BrokerService brokerService : toStop) { + brokerService.closeSession(); + } + BrokerImpl.this.consumerSessionClosed(this); + } + + @Override + public boolean isClosed() { + return closed; + } + + } + + private class ProviderSessionImpl extends ConsumerSessionImpl implements + ProviderSession { + + private Provider provider; + private Map sessionRpcImpls = Collections.synchronizedMap(new HashMap()); + + public ProviderSessionImpl(Provider provider) { + super(null); + this.provider = provider; + } + + @Override + public void addRpcImplementation(QName rpcType, + RpcImplementation implementation) + throws IllegalArgumentException { + if (rpcType == null) { + throw new IllegalArgumentException("rpcType must not be null"); + } + if (implementation == null) { + throw new IllegalArgumentException( + "Implementation must not be null"); + } + BrokerImpl.this.addRpcImplementation(rpcType, implementation); + sessionRpcImpls.put(rpcType, implementation); + } + + @Override + public void removeRpcImplementation(QName rpcType, + RpcImplementation implToRemove) throws IllegalArgumentException { + RpcImplementation localImpl = rpcImpls.get(rpcType); + if (localImpl != implToRemove) { + throw new IllegalStateException( + "Implementation was not registered in this session"); + } + + BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove); + sessionRpcImpls.remove(rpcType); + } + + public Provider getProvider() { + return this.provider; + } + + } +}