X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Frpc%2FRpcContextImpl.java;h=f450b96160b137e3b426601d5af3ea5ae4917e53;hb=39633bcebf3b3eb1bd1c92d37ba3c8b510916027;hp=6d1954d30d6870fbb5c33f64d73ccfdfdb7d07dc;hpb=9e5560f92325c0612cb52cfbcf59248fe289d8d7;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java index 6d1954d30d..f450b96160 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/rpc/RpcContextImpl.java @@ -8,7 +8,7 @@ package org.opendaylight.openflowplugin.impl.rpc; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; +import com.google.common.base.Function; import com.google.common.collect.Iterators; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -16,8 +16,9 @@ import java.util.Iterator; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration; import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry; @@ -25,6 +26,8 @@ import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; @@ -48,33 +51,30 @@ class RpcContextImpl implements RpcContext { // TODO: add private Sal salBroker private final ConcurrentMap, RoutedRpcRegistration> rpcRegistrations = new ConcurrentHashMap<>(); private final KeyedInstanceIdentifier nodeInstanceIdentifier; - private CONTEXT_STATE state; + private volatile ContextState state = ContextState.INITIALIZATION; private final DeviceInfo deviceInfo; private final DeviceContext deviceContext; private final ExtensionConverterProvider extensionConverterProvider; private final ConvertorExecutor convertorExecutor; private final NotificationPublishService notificationPublishService; - RpcContextImpl(final DeviceInfo deviceInfo, - final RpcProviderRegistry rpcProviderRegistry, - final MessageSpy messageSpy, + RpcContextImpl(@Nonnull final RpcProviderRegistry rpcProviderRegistry, final int maxRequests, - final KeyedInstanceIdentifier nodeInstanceIdentifier, - final DeviceContext deviceContext, - final ExtensionConverterProvider extensionConverterProvider, - final ConvertorExecutor convertorExecutor, - final NotificationPublishService notificationPublishService) { - this.messageSpy = Preconditions.checkNotNull(messageSpy); - this.rpcProviderRegistry = Preconditions.checkNotNull(rpcProviderRegistry); - this.nodeInstanceIdentifier = nodeInstanceIdentifier; - - tracker = new Semaphore(maxRequests, true); + @Nonnull final DeviceContext deviceContext, + @Nonnull final ExtensionConverterProvider extensionConverterProvider, + @Nonnull final ConvertorExecutor convertorExecutor, + @Nonnull final NotificationPublishService notificationPublishService, + boolean statisticsRpcEnabled) { + this.deviceContext = deviceContext; + this.deviceInfo = deviceContext.getDeviceInfo(); + this.nodeInstanceIdentifier = deviceContext.getDeviceInfo().getNodeInstanceIdentifier(); + this.messageSpy = deviceContext.getMessageSpy(); + this.rpcProviderRegistry = rpcProviderRegistry; this.extensionConverterProvider = extensionConverterProvider; this.notificationPublishService = notificationPublishService; - setState(CONTEXT_STATE.WORKING); - this.deviceInfo = deviceInfo; - this.deviceContext = deviceContext; this.convertorExecutor = convertorExecutor; + this.isStatisticsRpcEnabled = statisticsRpcEnabled; + this.tracker = new Semaphore(maxRequests, true); } /** @@ -84,12 +84,15 @@ class RpcContextImpl implements RpcContext { @Override public void registerRpcServiceImplementation(final Class serviceClass, final S serviceInstance) { - LOG.trace("Try to register service {} for device {}.", serviceClass, nodeInstanceIdentifier); if (! rpcRegistrations.containsKey(serviceClass)) { final RoutedRpcRegistration routedRpcReg = rpcProviderRegistry.addRoutedRpcImplementation(serviceClass, serviceInstance); routedRpcReg.registerPath(NodeContext.class, nodeInstanceIdentifier); rpcRegistrations.put(serviceClass, routedRpcReg); - LOG.debug("Registration of service {} for device {}.", serviceClass.getSimpleName(), nodeInstanceIdentifier.getKey().getId().getValue()); + if (LOG.isDebugEnabled()) { + LOG.debug("Registration of service {} for device {}.", + serviceClass.getSimpleName(), + nodeInstanceIdentifier.getKey().getId().getValue()); + } } } @@ -107,21 +110,26 @@ class RpcContextImpl implements RpcContext { */ @Override public void close() { - if (CONTEXT_STATE.TERMINATION.equals(getState())){ + if (ContextState.TERMINATION.equals(state)) { if (LOG.isDebugEnabled()) { - LOG.debug("RpcContext is already in TERMINATION state."); + LOG.debug("RpcContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue()); } } else { - setState(CONTEXT_STATE.TERMINATION); - for (final Iterator, RoutedRpcRegistration>> iterator = Iterators - .consumingIterator(rpcRegistrations.entrySet().iterator()); iterator.hasNext(); ) { - final RoutedRpcRegistration rpcRegistration = iterator.next().getValue(); - rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier); - rpcRegistration.close(); - if (LOG.isDebugEnabled()) { - LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType().getSimpleName(), - nodeInstanceIdentifier.getKey().getId().getValue()); - } + this.state = ContextState.TERMINATION; + unregisterRPCs(); + } + } + + private void unregisterRPCs() { + for (final Iterator, RoutedRpcRegistration>> iterator = Iterators + .consumingIterator(rpcRegistrations.entrySet().iterator()); iterator.hasNext(); ) { + final RoutedRpcRegistration rpcRegistration = iterator.next().getValue(); + rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier); + rpcRegistration.close(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType().getSimpleName(), + nodeInstanceIdentifier.getKey().getId().getValue()); } } } @@ -148,7 +156,7 @@ class RpcContextImpl implements RpcContext { tracker.release(); final long xid = getXid().getValue(); LOG.trace("Removed request context with xid {}", xid); - messageSpy.spyMessage(RpcContextImpl.class, MessageSpy.STATISTIC_GROUP.REQUEST_STACK_FREED); + messageSpy.spyMessage(RpcContextImpl.class, MessageSpy.StatisticsGroup.REQUEST_STACK_FREED); } }; } @@ -169,26 +177,6 @@ class RpcContextImpl implements RpcContext { return this.rpcRegistrations.isEmpty(); } - @Override - public void setStatisticsRpcEnabled(boolean isStatisticsRpcEnabled) { - this.isStatisticsRpcEnabled = isStatisticsRpcEnabled; - } - - @Override - public boolean isStatisticsRpcEnabled() { - return isStatisticsRpcEnabled; - } - - @Override - public CONTEXT_STATE getState() { - return this.state; - } - - @Override - public void setState(CONTEXT_STATE state) { - this.state = state; - } - @Override public ServiceGroupIdentifier getServiceIdentifier() { return this.deviceInfo.getServiceIdentifier(); @@ -200,9 +188,27 @@ class RpcContextImpl implements RpcContext { } @Override - public void startupClusterServices() throws ExecutionException, InterruptedException { + public ListenableFuture stopClusterServices() { + if (ContextState.TERMINATION.equals(this.state)) { + return Futures.immediateCancelledFuture(); + } + + return Futures.transform(Futures.immediateFuture(null), new Function() { + @Nullable + @Override + public Void apply(@Nullable Object input) { + unregisterRPCs(); + return null; + } + }); + } + + @Override + public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) { + LOG.info("Starting rpc context cluster services for node {}", deviceInfo.getLOGValue()); MdSalRegistrationUtils.registerServices(this, deviceContext, extensionConverterProvider, convertorExecutor); - if (isStatisticsRpcEnabled) { + + if (isStatisticsRpcEnabled && !deviceContext.canUseSingleLayerSerialization()) { MdSalRegistrationUtils.registerStatCompatibilityServices( this, deviceContext, @@ -210,11 +216,7 @@ class RpcContextImpl implements RpcContext { convertorExecutor); } - } - - @Override - public ListenableFuture stopClusterServices(boolean deviceDisconnected) { - MdSalRegistrationUtils.unregisterServices(this); - return Futures.immediateFuture(null); + mastershipChangeListener.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.RPC_REGISTRATION); + return true; } }