X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceManagerImpl.java;h=df49b1443769a6145fa754673ab673114b22c2b1;hb=617a0726d931230945a8a8d28a6e34be39f56b16;hp=362dbe171418d6dec88ea25dd2fee6e1c96c355f;hpb=e88094e6616318a43caa89c0bb32df7ef6ee9506;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index 362dbe1714..df49b14437 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -8,19 +8,18 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.base.Verify; import com.google.common.collect.Iterators; -import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.netty.util.HashedWheelTimer; import io.netty.util.TimerTask; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; -import java.util.Set; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -28,35 +27,41 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import org.opendaylight.controller.md.sal.binding.api.DataBroker; +import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; +import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; +import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; +import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceState; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceSynchronizeListener; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceValidListener; import org.opendaylight.openflowplugin.api.openflow.device.TranslatorLibrary; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceInitializationPhaseHandler; import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceTerminationPhaseHandler; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor; -import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; +import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy; import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; +import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider; import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; -import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils; +import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl; +import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,37 +74,51 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi private final long globalNotificationQuota; private final boolean switchFeaturesMandatory; - private boolean isNotificationFlowRemovedOff; - - private final int spyRate = 10; + private boolean isFlowRemovedNotificationOn; + private boolean skipTableFeatures; + private static final int SPY_RATE = 10; private final DataBroker dataBroker; + private final DeviceInitializerProvider deviceInitializerProvider; + private final ConvertorExecutor convertorExecutor; private TranslatorLibrary translatorLibrary; private DeviceInitializationPhaseHandler deviceInitPhaseHandler; private DeviceTerminationPhaseHandler deviceTerminPhaseHandler; private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); + private final ConcurrentMap lifecycleServices = new ConcurrentHashMap<>(); + + private long barrierIntervalNanos; + private int barrierCountLimit; - private final long barrierIntervalNanos; - private final int barrierCountLimit; private ExtensionConverterProvider extensionConverterProvider; private ScheduledThreadPoolExecutor spyPool; - private Set deviceSynchronizedListeners; - private Set deviceValidListeners; - - private final LifecycleConductor conductor; + private final ClusterSingletonServiceProvider singletonServiceProvider; + private final NotificationPublishService notificationPublishService; + private final MessageSpy messageSpy; + private final HashedWheelTimer hashedWheelTimer; + private final boolean useSingleLayerSerialization; public DeviceManagerImpl(@Nonnull final DataBroker dataBroker, - final long globalNotificationQuota, final boolean switchFeaturesMandatory, - final long barrierInterval, final int barrierCountLimit, - final LifecycleConductor lifecycleConductor, boolean isNotificationFlowRemovedOff) { - this.switchFeaturesMandatory = switchFeaturesMandatory; - this.globalNotificationQuota = globalNotificationQuota; - this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; - this.dataBroker = Preconditions.checkNotNull(dataBroker); + final long globalNotificationQuota, + final boolean switchFeaturesMandatory, + final long barrierInterval, + final int barrierCountLimit, + final MessageSpy messageSpy, + final boolean isFlowRemovedNotificationOn, + final ClusterSingletonServiceProvider singletonServiceProvider, + final NotificationPublishService notificationPublishService, + final HashedWheelTimer hashedWheelTimer, + final ConvertorExecutor convertorExecutor, + final boolean skipTableFeatures, + final boolean useSingleLayerSerialization, + final DeviceInitializerProvider deviceInitializerProvider) { + + this.dataBroker = dataBroker; + this.deviceInitializerProvider = deviceInitializerProvider; + /* merge empty nodes to oper DS to predict any problems with missing parent for Node */ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); - final NodesBuilder nodesBuilder = new NodesBuilder(); nodesBuilder.setNode(Collections.emptyList()); tx.merge(LogicalDatastoreType.OPERATIONAL, InstanceIdentifier.create(Nodes.class), nodesBuilder.build()); @@ -110,13 +129,19 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi throw new IllegalStateException(e); } + this.switchFeaturesMandatory = switchFeaturesMandatory; + this.globalNotificationQuota = globalNotificationQuota; + this.isFlowRemovedNotificationOn = isFlowRemovedNotificationOn; + this.skipTableFeatures = skipTableFeatures; + this.convertorExecutor = convertorExecutor; + this.hashedWheelTimer = hashedWheelTimer; this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval); this.barrierCountLimit = barrierCountLimit; - - this.conductor = lifecycleConductor; - spyPool = new ScheduledThreadPoolExecutor(1); - this.deviceSynchronizedListeners = new HashSet<>(); - this.deviceValidListeners = new HashSet<>(); + this.spyPool = new ScheduledThreadPoolExecutor(1); + this.singletonServiceProvider = singletonServiceProvider; + this.notificationPublishService = notificationPublishService; + this.messageSpy = messageSpy; + this.useSingleLayerSerialization = useSingleLayerSerialization; } @@ -126,38 +151,47 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @Override - public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo) throws Exception { + public void onDeviceContextLevelUp(@CheckForNull DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception { // final phase - we have to add new Device to MD-SAL DataStore LOG.debug("Final phase of DeviceContextLevelUp for Node: {} ", deviceInfo.getNodeId()); DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo)); - ((DeviceContextImpl) deviceContext).initialSubmitTransaction(); deviceContext.onPublished(); + lifecycleService.registerDeviceRemovedHandler(this); + lifecycleService.registerService(this.singletonServiceProvider); } @Override - public boolean deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception { + public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception { Preconditions.checkArgument(connectionContext != null); + final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - /** + /* * This part prevent destroy another device context. Throwing here an exception result to propagate close connection * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl} * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close */ if (deviceContexts.containsKey(deviceInfo)) { - LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId()); - return false; + DeviceContext deviceContext = deviceContexts.get(deviceInfo); + LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue()); + if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) { + LOG.warn("Node {} context state not in TERMINATION state.", + connectionContext.getDeviceInfo().getLOGValue()); + return ConnectionStatus.ALREADY_CONNECTED; + } else { + return ConnectionStatus.CLOSING; + } } LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}", connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId()); // Add Disconnect handler - connectionContext.setDeviceDisconnectedHandler(DeviceManagerImpl.this); + connectionContext.setDeviceDisconnectedHandler(this); + // Cache this for clarity final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter(); - //FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published + // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published connectionAdapter.setPacketInFiltering(true); final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion()); @@ -167,51 +201,43 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos); connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); - final DeviceState deviceState = new DeviceStateImpl(deviceInfo); - this.addDeviceSynchronizeListener(deviceState); - this.addDeviceValidListener(deviceState); - - final DeviceContext deviceContext = new DeviceContextImpl(connectionContext, - deviceState, + final LifecycleService lifecycleService = new LifecycleServiceImpl(); + final DeviceContext deviceContext = new DeviceContextImpl( + connectionContext, dataBroker, - conductor, - outboundQueueProvider, + messageSpy, translatorLibrary, - this); + this, + convertorExecutor, + skipTableFeatures, + hashedWheelTimer, + this, + useSingleLayerSerialization, + deviceInitializerProvider); + + deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext)); + deviceContexts.put(deviceInfo, deviceContext); + + lifecycleService.setDeviceContext(deviceContext); + deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService); + + lifecycleServices.put(deviceInfo, lifecycleService); - Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed."); + addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService); + + deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory); ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); - deviceContext.setNotificationPublishService(conductor.getNotificationPublishService()); + deviceContext.setNotificationPublishService(notificationPublishService); updatePacketInRateLimiters(); final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl( connectionAdapter, deviceContext); - connectionAdapter.setMessageListener(messageListener); - notifyDeviceValidListeners(deviceInfo, true); - - deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo()); - notifyDeviceSynchronizeListeners(deviceInfo, true); - - return true; - } - - private void updatePacketInRateLimiters() { - synchronized (deviceContexts) { - final int deviceContextsSize = deviceContexts.size(); - if (deviceContextsSize > 0) { - long freshNotificationLimit = globalNotificationQuota / deviceContextsSize; - if (freshNotificationLimit < 100) { - freshNotificationLimit = 100; - } - LOG.debug("fresh notification limit = {}", freshNotificationLimit); - for (final DeviceContext deviceContext : deviceContexts.values()) { - deviceContext.updatePacketInRateLimit(freshNotificationLimit); - } - } - } + connectionAdapter.setMessageListener(messageListener); + deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService); + return ConnectionStatus.MAY_CONTINUE; } @Override @@ -229,27 +255,24 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi for (final Iterator iterator = Iterators.consumingIterator(deviceContexts.values().iterator()); iterator.hasNext();) { final DeviceContext deviceCtx = iterator.next(); - notifyDeviceValidListeners(deviceCtx.getDeviceInfo(), false); deviceCtx.shutdownConnection(); deviceCtx.shuttingDownDataStoreTransactions(); } - if (spyPool != null) { - spyPool.shutdownNow(); - spyPool = null; - } + Optional.ofNullable(spyPool).ifPresent(ScheduledThreadPoolExecutor::shutdownNow); + spyPool = null; + } @Override public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) { - LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId()); - deviceContexts.remove(deviceInfo); updatePacketInRateLimiters(); + Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close); } @Override public void initialize() { - spyPool.scheduleAtFixedRate(conductor.getMessageIntelligenceAgency(), spyRate, spyRate, TimeUnit.SECONDS); + spyPool.scheduleAtFixedRate(messageSpy, SPY_RATE, SPY_RATE, TimeUnit.SECONDS); } @Override @@ -273,41 +296,51 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); final DeviceContext deviceCtx = this.deviceContexts.get(deviceInfo); - if (null == deviceCtx) { - LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getNodeId()); + if (Objects.isNull(deviceCtx)) { + LOG.info("DeviceContext for Node {} was not found. Connection is terminated without OFP context suite.", deviceInfo.getLOGValue()); + return; + } + + if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) { + LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue()); return; } + deviceCtx.close(); + if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) { - /* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */ + LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue()); + // Connection is not PrimaryConnection so try to remove from Auxiliary Connections deviceCtx.removeAuxiliaryConnectionContext(connectionContext); - } else { - notifyDeviceValidListeners(deviceInfo, false); - /* Device is disconnected and so we need to close TxManager */ - final ListenableFuture future = deviceCtx.shuttingDownDataStoreTransactions(); - Futures.addCallback(future, new FutureCallback() { - - @Override - public void onSuccess(final Void result) { - LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId()); - deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); - } - - @Override - public void onFailure(final Throwable t) { - LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t); - deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); - } - }); - /* Add timer for Close TxManager because it could fain ind cluster without notification */ - final TimerTask timerTask = timeout -> { - if (!future.isDone()) { - LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId()); - future.cancel(false); - } - }; - conductor.newTimeout(timerTask, 10, TimeUnit.SECONDS); } + + // TODO: Auxiliary connections supported ? + // Device is disconnected and so we need to close TxManager + final ListenableFuture future = deviceCtx.shuttingDownDataStoreTransactions(); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getLOGValue()); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); + } + + @Override + public void onFailure(final Throwable t) { + LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getLOGValue()); + LOG.trace("TxChainManager failed by closing. ", t); + deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo); + } + }); + + // Add timer for Close TxManager because it could fail in cluster without notification + final TimerTask timerTask = timeout -> { + if (!future.isDone()) { + LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getLOGValue()); + future.cancel(false); + } + }; + + hashedWheelTimer.newTimeout(timerTask, 10, TimeUnit.SECONDS); } @VisibleForTesting @@ -316,104 +349,95 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } @Override - public T gainContext(final DeviceInfo deviceInfo) { - return (T) deviceContexts.get(deviceInfo); + public void setFlowRemovedNotificationOn(boolean isNotificationFlowRemovedOff) { + this.isFlowRemovedNotificationOn = isNotificationFlowRemovedOff; } @Override - public ListenableFuture onClusterRoleChange(final DeviceInfo deviceInfo, final OfpRole role) { - DeviceContext deviceContext = conductor.getDeviceContext(deviceInfo); - LOG.trace("onClusterRoleChange {} for node:", role, deviceInfo.getNodeId()); - if (OfpRole.BECOMEMASTER.equals(role)) { - return onDeviceTakeClusterLeadership(deviceInfo); - } - return ((DeviceContextImpl)deviceContext).getTransactionChainManager().deactivateTransactionManager(); + public boolean isFlowRemovedNotificationOn() { + return this.isFlowRemovedNotificationOn; } - @Override - public void addDeviceSynchronizeListener(final DeviceSynchronizeListener deviceSynchronizeListener) { - this.deviceSynchronizedListeners.add(deviceSynchronizeListener); - } @Override - public void notifyDeviceSynchronizeListeners(final DeviceInfo deviceInfo, final boolean deviceSynchronized) { - for (DeviceSynchronizeListener listener : deviceSynchronizedListeners) { - listener.deviceIsSynchronized(deviceInfo, deviceSynchronized); - } + public void setSkipTableFeatures(boolean skipTableFeaturesValue) { + skipTableFeatures = skipTableFeaturesValue; } @Override - public void addDeviceValidListener(final DeviceValidListener deviceValidListener) { - this.deviceValidListeners.add(deviceValidListener); + public void setBarrierCountLimit(final int barrierCountLimit) { + this.barrierCountLimit = barrierCountLimit; } @Override - public void notifyDeviceValidListeners(final DeviceInfo deviceInfo, final boolean deviceValid) { - for (DeviceValidListener listener : deviceValidListeners) { - listener.deviceIsValid(deviceInfo, deviceValid); - } + public void setBarrierInterval(final long barrierTimeoutLimit) { + this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierTimeoutLimit); } @Override - public void setIsNotificationFlowRemovedOff(boolean isNotificationFlowRemovedOff) { - this.isNotificationFlowRemovedOff = isNotificationFlowRemovedOff; - } + public CheckedFuture removeDeviceFromOperationalDS(final DeviceInfo deviceInfo) { + final WriteTransaction delWtx = dataBroker.newWriteOnlyTransaction(); + delWtx.delete(LogicalDatastoreType.OPERATIONAL, deviceInfo.getNodeInstanceIdentifier()); + final CheckedFuture delFuture = delWtx.submit(); - @Override - public boolean getIsNotificationFlowRemovedOff() { - return this.isNotificationFlowRemovedOff; + Futures.addCallback(delFuture, new FutureCallback() { + @Override + public void onSuccess(final Void result) { + if (LOG.isDebugEnabled()) { + LOG.debug("Delete Node {} was successful", deviceInfo.getLOGValue()); + } + } + + @Override + public void onFailure(@Nonnull final Throwable t) { + LOG.warn("Delete node {} failed with exception {}", deviceInfo.getLOGValue(), t); + } + }); + + return delFuture; } - private ListenableFuture onDeviceTakeClusterLeadership(final DeviceInfo deviceInfo) { - LOG.trace("onDeviceTakeClusterLeadership for node: {}", deviceInfo.getNodeId()); - /* validation */ - StatisticsContext statisticsContext = conductor.getStatisticsContext(deviceInfo); - if (statisticsContext == null) { - final String errMsg = String.format("DeviceCtx %s is up but we are missing StatisticsContext", deviceInfo.getDatapathId()); - LOG.warn(errMsg); - return Futures.immediateFailedFuture(new IllegalStateException(errMsg)); - } - DeviceContext deviceContext = conductor.getDeviceContext(deviceInfo); - /* Prepare init info collecting */ - notifyDeviceSynchronizeListeners(deviceInfo, false); - ((DeviceContextImpl)deviceContext).getTransactionChainManager().activateTransactionManager(); - /* Init Collecting NodeInfo */ - final ListenableFuture initCollectingDeviceInfo = DeviceInitializationUtils.initializeNodeInformation( - deviceContext, switchFeaturesMandatory); - /* Init Collecting StatInfo */ - final ListenableFuture statPollFuture = Futures.transform(initCollectingDeviceInfo, - new AsyncFunction() { - - @Override - public ListenableFuture apply(@Nonnull final Void input) throws Exception { - statisticsContext.statListForCollectingInitialization(); - return statisticsContext.initialGatherDynamicData(); - } - }); - - return Futures.transform(statPollFuture, new Function() { + private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) { + Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback>() { @Override - public Void apply(final Boolean input) { - if (ConnectionContext.CONNECTION_STATE.RIP.equals(conductor.gainConnectionStateSafely(deviceInfo))) { - final String errMsg = String.format("We lost connection for Device %s, context has to be closed.", - deviceInfo.getNodeId()); - LOG.warn(errMsg); - throw new IllegalStateException(errMsg); - } - if (!input) { - final String errMsg = String.format("Get Initial Device %s information fails", - deviceInfo.getNodeId()); - LOG.warn(errMsg); - throw new IllegalStateException(errMsg); + public void onSuccess(@Nullable RpcResult setRoleOutputRpcResult) { + if (LOG.isDebugEnabled()) { + LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue()); } - LOG.debug("Get Initial Device {} information is successful", deviceInfo.getNodeId()); - notifyDeviceSynchronizeListeners(deviceInfo, true); - ((DeviceContextImpl)deviceContext).getTransactionChainManager().initialSubmitWriteTransaction(); - deviceContext.getDeviceState().setStatisticsPollingEnabledProp(true); - return null; + } + + @Override + public void onFailure(Throwable throwable) { + LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue()); + lifecycleService.closeConnection(); } }); } + private void updatePacketInRateLimiters() { + synchronized (deviceContexts) { + final int deviceContextsSize = deviceContexts.size(); + if (deviceContextsSize > 0) { + long freshNotificationLimit = globalNotificationQuota / deviceContextsSize; + if (freshNotificationLimit < 100) { + freshNotificationLimit = 100; + } + if (LOG.isDebugEnabled()) { + LOG.debug("fresh notification limit = {}", freshNotificationLimit); + } + for (final DeviceContext deviceContext : deviceContexts.values()) { + deviceContext.updatePacketInRateLimit(freshNotificationLimit); + } + } + } + } + + public void onDeviceRemoved(DeviceInfo deviceInfo) { + deviceContexts.remove(deviceInfo); + LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue()); + + lifecycleServices.remove(deviceInfo); + LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue()); + } }