X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Frpc%2FRpcContextImpl.java;h=1a8cb845f1452b8760ae60369c48f0bf7b4e0fe8;hb=dc442e042a48d75da43720866f17631fe4c5f524;hp=1168d08a254fd0c282ec841cea7b96023c48f025;hpb=06823e93f30fc73f9482a13da3f798e7757564d3;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java index 1168d08a25..1a8cb845f1 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java @@ -7,39 +7,58 @@ */ package org.opendaylight.openflowplugin.impl.rpc; -import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Semaphore; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; -import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; +import org.opendaylight.openflowplugin.api.openflow.device.XidSequencer; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; +import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.opendaylight.yangtools.yang.binding.RpcService; -import org.opendaylight.yangtools.yang.common.RpcError; -import org.opendaylight.yangtools.yang.common.RpcResult; -import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class RpcContextImpl implements RpcContext { - - private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(RpcContextImpl.class); - final RpcProviderRegistry rpcProviderRegistry; +class RpcContextImpl implements RpcContext { + private static final Logger LOG = LoggerFactory.getLogger(RpcContextImpl.class); + private final RpcProviderRegistry rpcProviderRegistry; + private final MessageSpy messageSpy; + private final Semaphore tracker; + private final XidSequencer xidSequencer; + private boolean isStatisticsRpcEnabled; // TODO: add private Sal salBroker - private final DeviceContext deviceContext; - private final List rpcRegistrations = new ArrayList<>(); - private final List> synchronizedRequestsList = Collections - .>synchronizedList(new ArrayList>()); - - private int maxRequestsPerDevice; - - public RpcContextImpl(final RpcProviderRegistry rpcProviderRegistry, final DeviceContext deviceContext) { - this.rpcProviderRegistry = rpcProviderRegistry; - this.deviceContext = deviceContext; + private final ConcurrentMap, RoutedRpcRegistration> rpcRegistrations = new ConcurrentHashMap<>(); + private final KeyedInstanceIdentifier nodeInstanceIdentifier; + private CONTEXT_STATE state; + private final DeviceInfo deviceInfo; + + RpcContextImpl(final DeviceInfo deviceInfo, + final RpcProviderRegistry rpcProviderRegistry, + final XidSequencer xidSequencer, + final MessageSpy messageSpy, + final int maxRequests, + final KeyedInstanceIdentifier nodeInstanceIdentifier) { + this.xidSequencer = Preconditions.checkNotNull(xidSequencer); + this.messageSpy = Preconditions.checkNotNull(messageSpy); + this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry); + this.nodeInstanceIdentifier = nodeInstanceIdentifier; + + tracker = new Semaphore(maxRequests, true); + setState(CONTEXT_STATE.WORKING); + this.deviceInfo = deviceInfo; } /** @@ -49,24 +68,20 @@ public class RpcContextImpl implements RpcContext { @Override public void registerRpcServiceImplementation(final Class serviceClass, final S serviceInstance) { - final RoutedRpcRegistration routedRpcReg = rpcProviderRegistry.addRoutedRpcImplementation(serviceClass, serviceInstance); - routedRpcReg.registerPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier()); - rpcRegistrations.add(routedRpcReg); - LOG.debug("Registration of service {} for device {}.",serviceClass, deviceContext.getDeviceState().getNodeInstanceIdentifier()); + LOG.trace("Try to register service {} for device {}.", serviceClass, nodeInstanceIdentifier); + if (! rpcRegistrations.containsKey(serviceClass)) { + final RoutedRpcRegistration routedRpcReg = rpcProviderRegistry.addRoutedRpcImplementation(serviceClass, serviceInstance); + routedRpcReg.registerPath(NodeContext.class, nodeInstanceIdentifier); + rpcRegistrations.put(serviceClass, routedRpcReg); + LOG.debug("Registration of service {} for device {}.", serviceClass, nodeInstanceIdentifier); + } } @Override - public SettableFuture> storeOrFail(final RequestContext requestContext) { - final SettableFuture> rpcResultFuture = requestContext.getFuture(); - - if (synchronizedRequestsList.size() < maxRequestsPerDevice) { - synchronizedRequestsList.add(requestContext); - } else { - final RpcResult rpcResult = RpcResultBuilder.failed() - .withError(RpcError.ErrorType.APPLICATION, "", "Device's request queue is full.").build(); - rpcResultFuture.set(rpcResult); - } - return rpcResultFuture; + public S lookupRpcService(final Class serviceClass) { + RoutedRpcRegistration registration = rpcRegistrations.get(serviceClass); + final RpcService rpcService = registration.getInstance(); + return (S) rpcService; } /** @@ -75,44 +90,94 @@ public class RpcContextImpl implements RpcContext { * @see java.lang.AutoCloseable#close() */ @Override - public void close() throws Exception { - for (final RoutedRpcRegistration rpcRegistration : rpcRegistrations) { - rpcRegistration.unregisterPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier()); - rpcRegistration.close(); + public void close() { + if (CONTEXT_STATE.TERMINATION.equals(getState())){ + if (LOG.isDebugEnabled()) { + LOG.debug("RpcContext is already in TERMINATION state."); + } + } else { + setState(CONTEXT_STATE.TERMINATION); + for (final Iterator, RoutedRpcRegistration>> iterator = Iterators + .consumingIterator(rpcRegistrations.entrySet().iterator()); iterator.hasNext(); ) { + final RoutedRpcRegistration rpcRegistration = iterator.next().getValue(); + rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier); + rpcRegistration.close(); + LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType(), + nodeInstanceIdentifier); + } } } - /** - * @see org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext#setRequestContextQuota(int) - */ @Override - public void setRequestContextQuota(final int maxRequestsPerDevice) { - this.maxRequestsPerDevice = maxRequestsPerDevice; + public RequestContext createRequestContext() { + if (!tracker.tryAcquire()) { + LOG.trace("Device queue {} at capacity", this); + return null; + } else { + LOG.trace("Acquired semaphore for {}, available permits:{} ", nodeInstanceIdentifier.getKey().getId(), tracker.availablePermits()); + } + + final Long xid = xidSequencer.reserveXidForDeviceMessage(); + if (xid == null) { + LOG.warn("Xid cannot be reserved for new RequestContext, node:{}", nodeInstanceIdentifier.getKey().getId()); + tracker.release(); + return null; + } + + return new AbstractRequestContext(xid) { + @Override + public void close() { + tracker.release(); + final long xid = getXid().getValue(); + LOG.trace("Removed request context with xid {}", xid); + messageSpy.spyMessage(RpcContextImpl.class, MessageSpy.STATISTIC_GROUP.REQUEST_STACK_FREED); + } + }; } @Override - public void forgetRequestContext(final RequestContext requestContext) { - synchronizedRequestsList.remove(requestContext); - LOG.trace("Removed request context with xid {}. Context request in list {}.", - requestContext.getXid().getValue(), synchronizedRequestsList.size()); + public void unregisterRpcServiceImplementation(final Class serviceClass) { + LOG.trace("Try to unregister serviceClass {} for Node {}", serviceClass, nodeInstanceIdentifier.getKey().getId()); + final RoutedRpcRegistration rpcRegistration = rpcRegistrations.remove(serviceClass); + if (rpcRegistration != null) { + rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier); + rpcRegistration.close(); + LOG.debug("Unregistration serviceClass {} for Node {}", serviceClass, nodeInstanceIdentifier.getKey().getId()); + } } + @VisibleForTesting + boolean isEmptyRpcRegistrations() { + return this.rpcRegistrations.isEmpty(); + } @Override - public RequestContext createRequestContext() { - return new RequestContextImpl(this); + public void setStatisticsRpcEnabled(boolean isStatisticsRpcEnabled) { + this.isStatisticsRpcEnabled = isStatisticsRpcEnabled; } - public boolean isRequestContextCapacityEmpty() { - return synchronizedRequestsList.size() <= maxRequestsPerDevice; + @Override + public boolean isStatisticsRpcEnabled() { + return isStatisticsRpcEnabled; } @Override - public void onDeviceDisconnected(final ConnectionContext connectionContext) { - for (RoutedRpcRegistration registration : rpcRegistrations) { - registration.close(); - } + public CONTEXT_STATE getState() { + return this.state; + } - synchronizedRequestsList.clear(); + @Override + public void setState(CONTEXT_STATE state) { + this.state = state; + } + + @Override + public ServiceGroupIdentifier getServiceIdentifier() { + return this.deviceInfo.getServiceIdentifier(); + } + + @Override + public DeviceInfo getDeviceInfo() { + return this.deviceInfo; } }