X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=opendaylight%2Fsal%2Fyang-prototype%2Fsal%2Fsal-broker-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fsal%2Fcore%2Fimpl%2FBrokerImpl.java;h=b8a0b97eab9abb81b851eeda349ec6156709442a;hb=refs%2Fchanges%2F12%2F112%2F1;hp=84bc0569504272704d3ea68605d42f6c13de0afe;hpb=0cc147bcc963544380071b7d101ece8bbea55849;p=controller.git diff --git a/opendaylight/sal/yang-prototype/sal/sal-broker-impl/src/main/java/org/opendaylight/controller/sal/core/impl/BrokerImpl.java b/opendaylight/sal/yang-prototype/sal/sal-broker-impl/src/main/java/org/opendaylight/controller/sal/core/impl/BrokerImpl.java index 84bc056950..b8a0b97eab 100644 --- a/opendaylight/sal/yang-prototype/sal/sal-broker-impl/src/main/java/org/opendaylight/controller/sal/core/impl/BrokerImpl.java +++ b/opendaylight/sal/yang-prototype/sal/sal-broker-impl/src/main/java/org/opendaylight/controller/sal/core/impl/BrokerImpl.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -8,46 +7,55 @@ */ package org.opendaylight.controller.sal.core.impl; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; - import org.opendaylight.controller.sal.core.api.Broker; import org.opendaylight.controller.sal.core.api.BrokerService; import org.opendaylight.controller.sal.core.api.Consumer; import org.opendaylight.controller.sal.core.api.Provider; +import org.opendaylight.controller.sal.core.api.RpcImplementation; import org.opendaylight.controller.sal.core.spi.BrokerModule; -import org.opendaylight.controller.yang.common.QName; -import org.opendaylight.controller.yang.common.RpcResult; -import org.opendaylight.controller.yang.data.api.CompositeNode; +import org.opendaylight.controller.yang.common.QName; +import org.opendaylight.controller.yang.common.RpcResult; +import org.opendaylight.controller.yang.data.api.CompositeNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class BrokerImpl implements Broker { private static Logger log = LoggerFactory.getLogger(BrokerImpl.class); - private Set sessions = new HashSet(); - private Set providerSessions = new HashSet(); - // private ExecutorService executor; - private Set modules = new HashSet(); + // Broker Generic Context + private Set sessions = Collections + .synchronizedSet(new HashSet()); + private Set providerSessions = Collections + .synchronizedSet(new HashSet()); + private Set modules = Collections + .synchronizedSet(new HashSet()); + private Map, BrokerModule> serviceProviders = Collections + .synchronizedMap(new HashMap, BrokerModule>()); + + // RPC Context + private Map rpcImpls = Collections + .synchronizedMap(new HashMap()); - private Map, BrokerModule> serviceProviders = new HashMap, BrokerModule>(); + // Implementation specific + private ExecutorService executor; @Override public ConsumerSession registerConsumer(Consumer consumer) { checkPredicates(consumer); log.info("Registering consumer " + consumer); - ConsumerSessionImpl session = newSessionFor(consumer); consumer.onSessionInitiated(session); - sessions.add(session); - return session; - } @Override @@ -56,11 +64,26 @@ public class BrokerImpl implements Broker { ProviderSessionImpl session = newSessionFor(provider); provider.onSessionInitiated(session); - providerSessions.add(session); return session; } + public void addModule(BrokerModule module) { + log.info("Registering broker module " + module); + if (modules.contains(module)) { + log.error("Module already registered"); + throw new IllegalArgumentException("Module already exists."); + } + + Set> provServices = module + .getProvidedServices(); + for (Class serviceType : provServices) { + log.info(" Registering session service implementation: " + + serviceType.getCanonicalName()); + serviceProviders.put(serviceType, module); + } + } + public T serviceFor(Class service, ConsumerSessionImpl session) { BrokerModule prov = serviceProviders.get(service); @@ -71,11 +94,43 @@ public class BrokerImpl implements Broker { return prov.getServiceForSession(service, session); } - public Future> invokeRpc(QName rpc, + // RPC Functionality + + private void addRpcImplementation(QName rpcType, + RpcImplementation implementation) { + synchronized (rpcImpls) { + if (rpcImpls.get(rpcType) != null) { + throw new IllegalStateException("Implementation for rpc " + + rpcType + " is already registered."); + } + rpcImpls.put(rpcType, implementation); + } + // TODO Add notification for availability of Rpc Implementation + } + + private void removeRpcImplementation(QName rpcType, + RpcImplementation implToRemove) { + synchronized (rpcImpls) { + if (implToRemove == rpcImpls.get(rpcType)) { + rpcImpls.remove(rpcType); + } + } + // TODO Add notification for removal of Rpc Implementation + } + + private Future> invokeRpc(QName rpc, CompositeNode input) { - // TODO Implement this method - throw new UnsupportedOperationException("Not implemented"); + RpcImplementation impl = rpcImpls.get(rpc); + // if() + + Callable> call = callableFor(impl, + rpc, input); + Future> result = executor.submit(call); + + return result; } + + // Validation private void checkPredicates(Provider prov) { if (prov == null) @@ -96,34 +151,130 @@ public class BrokerImpl implements Broker { } } + // Private Factory methods + private ConsumerSessionImpl newSessionFor(Consumer cons) { - return new ConsumerSessionImpl(this, cons); + return new ConsumerSessionImpl(cons); } private ProviderSessionImpl newSessionFor(Provider provider) { - return new ProviderSessionImpl(this, provider); + return new ProviderSessionImpl(provider); } - public void addModule(BrokerModule module) { - log.info("Registering broker module " + module); - if (modules.contains(module)) { - log.error("Module already registered"); - throw new IllegalArgumentException("Module already exists."); + private void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) { + sessions.remove(consumerSessionImpl); + providerSessions.remove(consumerSessionImpl); + } + + private static Callable> callableFor( + final RpcImplementation implemenation, final QName rpc, + final CompositeNode input) { + + return new Callable>() { + + @Override + public RpcResult call() throws Exception { + return implemenation.invokeRpc(rpc, input); + } + }; + } + + private class ConsumerSessionImpl implements ConsumerSession { + + private final Consumer consumer; + + private Map, BrokerService> instantiatedServices = Collections + .synchronizedMap(new HashMap, BrokerService>()); + private boolean closed = false; + + public Consumer getConsumer() { + return consumer; } - Set> provServices = module - .getProvidedServices(); - for (Class serviceType : provServices) { - log.info(" Registering session service implementation: " - + serviceType.getCanonicalName()); - serviceProviders.put(serviceType, module); + public ConsumerSessionImpl(Consumer consumer) { + this.consumer = consumer; + } + + @Override + public Future> rpc(QName rpc, + CompositeNode input) { + return BrokerImpl.this.invokeRpc(rpc, input); + } + + @Override + public T getService(Class service) { + BrokerService potential = instantiatedServices.get(service); + if (potential != null) { + @SuppressWarnings("unchecked") + T ret = (T) potential; + return ret; + } + T ret = BrokerImpl.this.serviceFor(service, this); + if (ret != null) { + instantiatedServices.put(service, ret); + } + return ret; + } + + @Override + public void close() { + Collection toStop = instantiatedServices.values(); + this.closed = true; + for (BrokerService brokerService : toStop) { + brokerService.closeSession(); + } + BrokerImpl.this.consumerSessionClosed(this); + } + + @Override + public boolean isClosed() { + return closed; } - } - public void consumerSessionClosed(ConsumerSessionImpl consumerSessionImpl) { - sessions.remove(consumerSessionImpl); - providerSessions.remove(consumerSessionImpl); } + private class ProviderSessionImpl extends ConsumerSessionImpl implements + ProviderSession { + + private Provider provider; + private Map sessionRpcImpls = Collections.synchronizedMap(new HashMap()); + + public ProviderSessionImpl(Provider provider) { + super(null); + this.provider = provider; + } + + @Override + public void addRpcImplementation(QName rpcType, + RpcImplementation implementation) + throws IllegalArgumentException { + if (rpcType == null) { + throw new IllegalArgumentException("rpcType must not be null"); + } + if (implementation == null) { + throw new IllegalArgumentException( + "Implementation must not be null"); + } + BrokerImpl.this.addRpcImplementation(rpcType, implementation); + sessionRpcImpls.put(rpcType, implementation); + } + + @Override + public void removeRpcImplementation(QName rpcType, + RpcImplementation implToRemove) throws IllegalArgumentException { + RpcImplementation localImpl = rpcImpls.get(rpcType); + if (localImpl != implToRemove) { + throw new IllegalStateException( + "Implementation was not registered in this session"); + } + + BrokerImpl.this.removeRpcImplementation(rpcType, implToRemove); + sessionRpcImpls.remove(rpcType); + } + + public Provider getProvider() { + return this.provider; + } + + } } -