X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;ds=sidebyside;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Frpc%2FRpcContextImpl.java;h=f794d850511c98509dc4e8a5f36c8305038b79bb;hb=d1af0fd5a4053a10917f631bae42970c1960fd20;hp=453932b9352ded88585d3ee27add0d7e89d9cb47;hpb=bae31dffdd89b34775467e22e756690df6ce2668;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java index 453932b935..f794d85051 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java @@ -7,50 +7,76 @@ */ package org.opendaylight.openflowplugin.impl.rpc; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicLong; -import com.google.common.base.Preconditions; +import javax.annotation.Nullable; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; +import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; -import org.opendaylight.openflowplugin.impl.util.MdSalRegistratorUtils; +import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; +import org.opendaylight.openflowplugin.impl.util.MdSalRegistrationUtils; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeContext; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.opendaylight.yangtools.yang.binding.RpcService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RpcContextImpl implements RpcContext { +class RpcContextImpl implements RpcContext { private static final Logger LOG = LoggerFactory.getLogger(RpcContextImpl.class); private final RpcProviderRegistry rpcProviderRegistry; - private final DeviceContext deviceContext; private final MessageSpy messageSpy; private final Semaphore tracker; + private boolean isStatisticsRpcEnabled; // TODO: add private Sal salBroker private final ConcurrentMap, RoutedRpcRegistration> rpcRegistrations = new ConcurrentHashMap<>(); - private final boolean isStatisticsRpcEnabled; + private final KeyedInstanceIdentifier nodeInstanceIdentifier; + private volatile CONTEXT_STATE state = CONTEXT_STATE.INITIALIZATION; + private final DeviceInfo deviceInfo; + private final DeviceContext deviceContext; + private final ExtensionConverterProvider extensionConverterProvider; + private final ConvertorExecutor convertorExecutor; private final NotificationPublishService notificationPublishService; + private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler; - public RpcContextImpl(final MessageSpy messageSpy, final RpcProviderRegistry rpcProviderRegistry, final DeviceContext deviceContext, - final int maxRequests, final boolean isStatisticsRpcEnabled, - final NotificationPublishService notificationPublishService) { - this.deviceContext = Preconditions.checkNotNull(deviceContext); + RpcContextImpl(final DeviceInfo deviceInfo, + final RpcProviderRegistry rpcProviderRegistry, + final MessageSpy messageSpy, + final int maxRequests, + final KeyedInstanceIdentifier nodeInstanceIdentifier, + final DeviceContext deviceContext, + final ExtensionConverterProvider extensionConverterProvider, + final ConvertorExecutor convertorExecutor, + final NotificationPublishService notificationPublishService) { this.messageSpy = Preconditions.checkNotNull(messageSpy); this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry); - this.isStatisticsRpcEnabled = isStatisticsRpcEnabled; + this.nodeInstanceIdentifier = nodeInstanceIdentifier; + this.tracker = new Semaphore(maxRequests, true); + this.extensionConverterProvider = extensionConverterProvider; this.notificationPublishService = notificationPublishService; - tracker = new Semaphore(maxRequests, true); - deviceContext.setRpcContext(RpcContextImpl.this); + this.deviceInfo = deviceInfo; + this.deviceContext = deviceContext; + this.convertorExecutor = convertorExecutor; } /** @@ -60,28 +86,22 @@ public class RpcContextImpl implements RpcContext { @Override public void registerRpcServiceImplementation(final Class serviceClass, final S serviceInstance) { - LOG.trace("Try to register service {} for device {}.", serviceClass, deviceContext.getDeviceState().getNodeInstanceIdentifier()); + LOG.trace("Try to register service {} for device {}.", serviceClass, nodeInstanceIdentifier); if (! rpcRegistrations.containsKey(serviceClass)) { final RoutedRpcRegistration routedRpcReg = rpcProviderRegistry.addRoutedRpcImplementation(serviceClass, serviceInstance); - routedRpcReg.registerPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier()); + routedRpcReg.registerPath(NodeContext.class, nodeInstanceIdentifier); rpcRegistrations.put(serviceClass, routedRpcReg); - LOG.debug("Registration of service {} for device {}.", serviceClass, deviceContext.getDeviceState().getNodeInstanceIdentifier()); - } - } - - @Override - public void registerStatCompatibilityServices() { - if (isStatisticsRpcEnabled) { - MdSalRegistratorUtils.registerStatCompatibilityServices(RpcContextImpl.this, deviceContext, - notificationPublishService, new AtomicLong()); + LOG.debug("Registration of service {} for device {}.", serviceClass.getSimpleName(), nodeInstanceIdentifier.getKey().getId().getValue()); } } @Override public S lookupRpcService(final Class serviceClass) { - final RpcService rpcService = rpcRegistrations.get(serviceClass).getInstance(); + RoutedRpcRegistration registration = rpcRegistrations.get(serviceClass); + final RpcService rpcService = registration.getInstance(); return (S) rpcService; } + /** * Unregisters all services. * @@ -89,13 +109,18 @@ public class RpcContextImpl implements RpcContext { */ @Override public void close() { - for (final Iterator, RoutedRpcRegistration>> iterator = Iterators - .consumingIterator(rpcRegistrations.entrySet().iterator()); iterator.hasNext();) { - final RoutedRpcRegistration rpcRegistration = iterator.next().getValue(); - rpcRegistration.unregisterPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier()); - rpcRegistration.close(); - LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType(), - deviceContext.getDeviceState().getNodeInstanceIdentifier()); + if (CONTEXT_STATE.TERMINATION.equals(getState())){ + if (LOG.isDebugEnabled()) { + LOG.debug("RpcContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue()); + } + } else { + try { + stopClusterServices().get(); + } catch (Exception e) { + LOG.debug("Failed to close RpcContext for node {} with exception: ", getDeviceInfo().getLOGValue(), e); + } + + this.state = CONTEXT_STATE.TERMINATION; } } @@ -105,12 +130,12 @@ public class RpcContextImpl implements RpcContext { LOG.trace("Device queue {} at capacity", this); return null; } else { - LOG.trace("Acquired semaphore for {}, available permits:{} ", deviceContext.getDeviceState().getNodeId(), tracker.availablePermits()); + LOG.trace("Acquired semaphore for {}, available permits:{} ", nodeInstanceIdentifier.getKey().getId().getValue(), tracker.availablePermits()); } - final Long xid = deviceContext.reservedXidForDeviceMessage(); + final Long xid = deviceInfo.reserveXidForDeviceMessage(); if (xid == null) { - LOG.warn("Xid cannot be reserved for new RequestContext, node:{}", deviceContext.getDeviceState().getNodeId()); + LOG.warn("Xid cannot be reserved for new RequestContext, node:{}", nodeInstanceIdentifier.getKey().getId().getValue()); tracker.release(); return null; } @@ -128,12 +153,89 @@ public class RpcContextImpl implements RpcContext { @Override public void unregisterRpcServiceImplementation(final Class serviceClass) { - LOG.trace("Try to unregister serviceClass {} for Node {}", serviceClass, deviceContext.getDeviceState().getNodeId()); + LOG.trace("Try to unregister serviceClass {} for Node {}", serviceClass, nodeInstanceIdentifier.getKey().getId()); final RoutedRpcRegistration rpcRegistration = rpcRegistrations.remove(serviceClass); if (rpcRegistration != null) { - rpcRegistration.unregisterPath(NodeContext.class, deviceContext.getDeviceState().getNodeInstanceIdentifier()); + rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier); rpcRegistration.close(); - LOG.debug("Unregistration serviceClass {} for Node {}", serviceClass, deviceContext.getDeviceState().getNodeId()); + LOG.debug("Un-registration serviceClass {} for Node {}", serviceClass.getSimpleName(), nodeInstanceIdentifier.getKey().getId().getValue()); } } + + @VisibleForTesting + boolean isEmptyRpcRegistrations() { + return this.rpcRegistrations.isEmpty(); + } + + @Override + public void setStatisticsRpcEnabled(boolean isStatisticsRpcEnabled) { + this.isStatisticsRpcEnabled = isStatisticsRpcEnabled; + } + + @Override + public CONTEXT_STATE getState() { + return this.state; + } + + @Override + public ServiceGroupIdentifier getServiceIdentifier() { + return this.deviceInfo.getServiceIdentifier(); + } + + @Override + public DeviceInfo getDeviceInfo() { + return this.deviceInfo; + } + + @Override + public ListenableFuture stopClusterServices() { + if (CONTEXT_STATE.TERMINATION.equals(getState())) { + return Futures.immediateCancelledFuture(); + } + + return Futures.transform(Futures.immediateFuture(null), new Function() { + @Nullable + @Override + public Void apply(@Nullable Object input) { + for (final Iterator, RoutedRpcRegistration>> iterator = Iterators + .consumingIterator(rpcRegistrations.entrySet().iterator()); iterator.hasNext(); ) { + final RoutedRpcRegistration rpcRegistration = iterator.next().getValue(); + rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier); + rpcRegistration.close(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType().getSimpleName(), + nodeInstanceIdentifier.getKey().getId().getValue()); + } + } + + return null; + } + }); + } + + @Override + public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) { + this.clusterInitializationPhaseHandler = handler; + } + + @Override + public boolean onContextInstantiateService(final ConnectionContext connectionContext) { + if (connectionContext.getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) { + LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue()); + return false; + } + + MdSalRegistrationUtils.registerServices(this, deviceContext, extensionConverterProvider, convertorExecutor); + + if (isStatisticsRpcEnabled) { + MdSalRegistrationUtils.registerStatCompatibilityServices( + this, + deviceContext, + notificationPublishService, + convertorExecutor); + } + + return this.clusterInitializationPhaseHandler.onContextInstantiateService(connectionContext); + } }