X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceManagerImpl.java;h=c8207372d27a9acdb58274894448a81f92e7d352;hb=68d9283e3e9bca8912e4f7057b63ba0214feb5c5;hp=d88cb1c359489f4ceb7f40a977c29bd5411f1c0f;hpb=9335077084ca58812fe45d2042e553314b2550fe;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java index d88cb1c359..c8207372d2 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java @@ -27,18 +27,15 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.WriteTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; -import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration; import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; -import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionStatus; import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; @@ -51,16 +48,14 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.Messa import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper; import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider; import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl; +import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider; import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl; -import org.opendaylight.openflowplugin.impl.lifecycle.LifecycleServiceImpl; -import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl; +import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodesBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; -import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; -import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,9 +73,9 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi private static final int SPY_RATE = 10; private final DataBroker dataBroker; + private final DeviceInitializerProvider deviceInitializerProvider; private final ConvertorExecutor convertorExecutor; private TranslatorLibrary translatorLibrary; - private DeviceInitializationPhaseHandler deviceInitPhaseHandler; private DeviceTerminationPhaseHandler deviceTerminPhaseHandler; private final ConcurrentMap deviceContexts = new ConcurrentHashMap<>(); @@ -91,10 +86,10 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi private ExtensionConverterProvider extensionConverterProvider; private ScheduledThreadPoolExecutor spyPool; - private final ClusterSingletonServiceProvider singletonServiceProvider; private final NotificationPublishService notificationPublishService; private final MessageSpy messageSpy; private final HashedWheelTimer hashedWheelTimer; + private boolean useSingleLayerSerialization; public DeviceManagerImpl(@Nonnull final DataBroker dataBroker, final long globalNotificationQuota, @@ -107,9 +102,12 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi final NotificationPublishService notificationPublishService, final HashedWheelTimer hashedWheelTimer, final ConvertorExecutor convertorExecutor, - final boolean skipTableFeatures) { + final boolean skipTableFeatures, + final boolean useSingleLayerSerialization, + final DeviceInitializerProvider deviceInitializerProvider) { this.dataBroker = dataBroker; + this.deviceInitializerProvider = deviceInitializerProvider; /* merge empty nodes to oper DS to predict any problems with missing parent for Node */ final WriteTransaction tx = dataBroker.newWriteOnlyTransaction(); @@ -132,15 +130,14 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi this.barrierIntervalNanos = TimeUnit.MILLISECONDS.toNanos(barrierInterval); this.barrierCountLimit = barrierCountLimit; this.spyPool = new ScheduledThreadPoolExecutor(1); - this.singletonServiceProvider = singletonServiceProvider; this.notificationPublishService = notificationPublishService; this.messageSpy = messageSpy; + this.useSingleLayerSerialization = useSingleLayerSerialization; } @Override public void setDeviceInitializationPhaseHandler(final DeviceInitializationPhaseHandler handler) { - this.deviceInitPhaseHandler = handler; } @Override @@ -150,86 +147,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi DeviceContext deviceContext = Preconditions.checkNotNull(deviceContexts.get(deviceInfo)); deviceContext.onPublished(); lifecycleService.registerDeviceRemovedHandler(this); - lifecycleService.registerService(this.singletonServiceProvider); - } - - @Override - public ConnectionStatus deviceConnected(@CheckForNull final ConnectionContext connectionContext) throws Exception { - Preconditions.checkArgument(connectionContext != null); - final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - - /* - * This part prevent destroy another device context. Throwing here an exception result to propagate close connection - * in {@link org.opendaylight.openflowplugin.impl.connection.org.opendaylight.openflowplugin.impl.connection.HandshakeContextImpl} - * If context already exist we are in state closing process (connection flapping) and we should not propagate connection close - */ - if (deviceContexts.containsKey(deviceInfo)) { - DeviceContext deviceContext = deviceContexts.get(deviceInfo); - LOG.warn("Node {} already connected disconnecting device. Rejecting connection", deviceInfo.getLOGValue()); - if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) { - LOG.warn("Node {} context state not in TERMINATION state.", - connectionContext.getDeviceInfo().getLOGValue()); - return ConnectionStatus.ALREADY_CONNECTED; - } else { - return ConnectionStatus.CLOSING; - } - } - - LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}", - connectionContext.getConnectionAdapter().getRemoteAddress(), deviceInfo.getNodeId()); - - // Add Disconnect handler - connectionContext.setDeviceDisconnectedHandler(this); - - // Cache this for clarity - final ConnectionAdapter connectionAdapter = connectionContext.getConnectionAdapter(); - - // FIXME: as soon as auxiliary connection are fully supported then this is needed only before device context published - connectionAdapter.setPacketInFiltering(true); - - final OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(deviceInfo.getVersion()); - - connectionContext.setOutboundQueueProvider(outboundQueueProvider); - final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = - connectionAdapter.registerOutboundQueueHandler(outboundQueueProvider, barrierCountLimit, barrierIntervalNanos); - connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); - - final LifecycleService lifecycleService = new LifecycleServiceImpl(); - - final DeviceContext deviceContext = new DeviceContextImpl( - connectionContext, - dataBroker, - messageSpy, - translatorLibrary, - this, - convertorExecutor, - skipTableFeatures, - hashedWheelTimer, - this); - - deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext)); - deviceContexts.put(deviceInfo, deviceContext); - - lifecycleService.setDeviceContext(deviceContext); - deviceContext.putLifecycleServiceIntoTxChainManager(lifecycleService); - - lifecycleServices.put(deviceInfo, lifecycleService); - - addCallbackToDeviceInitializeToSlave(deviceInfo, deviceContext, lifecycleService); - - deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory); - - ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); - deviceContext.setNotificationPublishService(notificationPublishService); - - updatePacketInRateLimiters(); - - final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl( - connectionAdapter, deviceContext); - - connectionAdapter.setMessageListener(messageListener); - deviceInitPhaseHandler.onDeviceContextLevelUp(connectionContext.getDeviceInfo(), lifecycleService); - return ConnectionStatus.MAY_CONTINUE; } @Override @@ -259,7 +176,7 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi @Override public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) { updatePacketInRateLimiters(); - Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(OFPContext::close); + Optional.ofNullable(lifecycleServices.get(deviceInfo)).ifPresent(LifecycleService::close); } @Override @@ -293,6 +210,14 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi return; } + if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) { + LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue()); + // Connection is not PrimaryConnection so try to remove from Auxiliary Connections + deviceCtx.removeAuxiliaryConnectionContext(connectionContext); + // If this is not primary connection, we should not continue disabling everything + return; + } + if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) { LOG.info("Device context for node {} is already is termination state, waiting for close all context", deviceInfo.getLOGValue()); return; @@ -300,12 +225,6 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi deviceCtx.close(); - if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) { - LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getLOGValue()); - // Connection is not PrimaryConnection so try to remove from Auxiliary Connections - deviceCtx.removeAuxiliaryConnectionContext(connectionContext); - } - // TODO: Auxiliary connections supported ? // Device is disconnected and so we need to close TxManager final ListenableFuture future = deviceCtx.shuttingDownDataStoreTransactions(); @@ -389,22 +308,57 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi return delFuture; } + @Override + public void setUseSingleLayerSerialization(final Boolean useSingleLayerSerialization) { + this.useSingleLayerSerialization = useSingleLayerSerialization; + } - private void addCallbackToDeviceInitializeToSlave(final DeviceInfo deviceInfo, final DeviceContext deviceContext, final LifecycleService lifecycleService) { - Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback>() { - @Override - public void onSuccess(@Nullable RpcResult setRoleOutputRpcResult) { - if (LOG.isDebugEnabled()) { - LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue()); - } - } + public DeviceContext createContext(@CheckForNull final ConnectionContext connectionContext) { - @Override - public void onFailure(Throwable throwable) { - LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getLOGValue()); - lifecycleService.closeConnection(); - } - }); + LOG.info("ConnectionEvent: Device connected to controller, Device:{}, NodeId:{}", + connectionContext.getConnectionAdapter().getRemoteAddress(), + connectionContext.getDeviceInfo().getNodeId()); + + connectionContext.getConnectionAdapter().setPacketInFiltering(true); + + final OutboundQueueProvider outboundQueueProvider + = new OutboundQueueProviderImpl(connectionContext.getDeviceInfo().getVersion()); + + connectionContext.setOutboundQueueProvider(outboundQueueProvider); + final OutboundQueueHandlerRegistration outboundQueueHandlerRegistration = + connectionContext.getConnectionAdapter().registerOutboundQueueHandler( + outboundQueueProvider, + barrierCountLimit, + barrierIntervalNanos); + connectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration); + + + final DeviceContext deviceContext = new DeviceContextImpl( + connectionContext, + dataBroker, + messageSpy, + translatorLibrary, + this, + convertorExecutor, + skipTableFeatures, + hashedWheelTimer, + useSingleLayerSerialization, + deviceInitializerProvider); + + deviceContext.setSalRoleService(new SalRoleServiceImpl(deviceContext, deviceContext)); + deviceContext.setSwitchFeaturesMandatory(switchFeaturesMandatory); + ((ExtensionConverterProviderKeeper) deviceContext).setExtensionConverterProvider(extensionConverterProvider); + deviceContext.setNotificationPublishService(notificationPublishService); + + deviceContexts.put(connectionContext.getDeviceInfo(), deviceContext); + updatePacketInRateLimiters(); + + final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl( + connectionContext.getConnectionAdapter(), deviceContext); + + connectionContext.getConnectionAdapter().setMessageListener(messageListener); + + return deviceContext; } private void updatePacketInRateLimiters() { @@ -425,11 +379,22 @@ public class DeviceManagerImpl implements DeviceManager, ExtensionConverterProvi } } - public void onDeviceRemoved(DeviceInfo deviceInfo) { + @Override + public void onDeviceRemoved(final DeviceInfo deviceInfo) { deviceContexts.remove(deviceInfo); - LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue()); + if (LOG.isDebugEnabled()) { + LOG.debug("Device context removed for node {}", deviceInfo.getLOGValue()); + } + this.updatePacketInRateLimiters(); + } + + @Override + public long getBarrierIntervalNanos() { + return barrierIntervalNanos; + } - lifecycleServices.remove(deviceInfo); - LOG.debug("Lifecycle service removed for node {}", deviceInfo.getLOGValue()); + @Override + public int getBarrierCountLimit() { + return barrierCountLimit; } }