X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fdevice%2FDeviceContextImpl.java;h=9b12f6a019a51bc29e8a9f6adcfbae440951f94b;hb=8d0b3819bc847095e0fc971673c819a717a12305;hp=65c199ce52458cd5789ba5c12f7f231212b697ad;hpb=aa35db69cb180a157184eadc5cdf8c6ad020937c;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java index 65c199ce52..9b12f6a019 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java @@ -8,27 +8,26 @@ package org.opendaylight.openflowplugin.impl.device; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Verify; +import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; import com.google.common.util.concurrent.ListenableFuture; +import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; -import java.math.BigInteger; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ExecutionException; -import javax.annotation.Nonnull; +import io.netty.util.TimerTask; import org.opendaylight.controller.md.sal.binding.api.DataBroker; import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction; import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType; -import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException; import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter; import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey; +import org.opendaylight.openflowplugin.api.ConnectionException; +import org.opendaylight.openflowplugin.api.OFConstants; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; @@ -43,6 +42,7 @@ import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgColl import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey; +import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion; import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry; import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry; import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor; @@ -58,15 +58,19 @@ import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionCon import org.opendaylight.openflowplugin.extension.api.exception.ConversionException; import org.opendaylight.openflowplugin.extension.api.path.MessagePath; import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl; -import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil; +import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider; +import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory; +import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer; +import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider; import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl; import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl; import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory; import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl; import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl; -import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils; +import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext; import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl; +import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil; import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector; @@ -75,6 +79,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.ta import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdatedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey; @@ -83,7 +89,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev13 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage; @@ -94,16 +99,30 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketReceived; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsData; import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.FlowCapableNodeConnectorStatisticsDataBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.OfpRole; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInput; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleInputBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; +import org.opendaylight.yangtools.yang.common.RpcResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * - */ -public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper{ +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper { private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class); @@ -113,8 +132,14 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi private static final float LOW_WATERMARK_FACTOR = 0.75f; // TODO: high water mark factor should be parametrized private static final float HIGH_WATERMARK_FACTOR = 0.95f; + + // Timeout in seconds after what we will give up on propagating role + private static final int SET_ROLE_TIMEOUT = 10; + private boolean initialized; + private SalRoleService salRoleService = null; + private final HashedWheelTimer hashedWheelTimer; private ConnectionContext primaryConnectionContext; private final DeviceState deviceState; private final DataBroker dataBroker; @@ -141,17 +166,29 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi private final ConvertorExecutor convertorExecutor; private volatile CONTEXT_STATE state; private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler; + private final DeviceManager myManager; + private final DeviceInitializerProvider deviceInitializerProvider; + private final boolean useSingleLayerSerialization; + private Boolean isAddNotificationSent = false; DeviceContextImpl( - @Nonnull final ConnectionContext primaryConnectionContext, - @Nonnull final DataBroker dataBroker, - @Nonnull final MessageSpy messageSpy, - @Nonnull final TranslatorLibrary translatorLibrary, - @Nonnull final DeviceManager manager, - final ConvertorExecutor convertorExecutor, - final boolean skipTableFeatures) { + @Nonnull final ConnectionContext primaryConnectionContext, + @Nonnull final DataBroker dataBroker, + @Nonnull final MessageSpy messageSpy, + @Nonnull final TranslatorLibrary translatorLibrary, + @Nonnull final DeviceManager manager, + final ConvertorExecutor convertorExecutor, + final boolean skipTableFeatures, + final HashedWheelTimer hashedWheelTimer, + final DeviceManager myManager, + final boolean useSingleLayerSerialization, + final DeviceInitializerProvider deviceInitializerProvider) { + this.primaryConnectionContext = primaryConnectionContext; this.deviceInfo = primaryConnectionContext.getDeviceInfo(); + this.hashedWheelTimer = hashedWheelTimer; + this.myManager = myManager; + this.deviceInitializerProvider = deviceInitializerProvider; this.deviceState = new DeviceStateImpl(); this.dataBroker = dataBroker; this.auxiliaryConnectionContexts = new HashMap<>(); @@ -175,12 +212,15 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi this.state = CONTEXT_STATE.INITIALIZATION; this.convertorExecutor = convertorExecutor; this.skipTableFeatures = skipTableFeatures; + this.useSingleLayerSerialization = useSingleLayerSerialization; this.initialized = false; } @Override public void initialSubmitTransaction() { - transactionChainManager.initialSubmitWriteTransaction(); + if (initialized) { + transactionChainManager.initialSubmitWriteTransaction(); + } } @Override @@ -215,7 +255,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi public void writeToTransaction(final LogicalDatastoreType store, final InstanceIdentifier path, final T data){ - if (Objects.nonNull(transactionChainManager)) { + if (initialized) { transactionChainManager.writeToTransaction(store, path, data, false); } } @@ -224,21 +264,21 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi public void writeToTransactionWithParentsSlow(final LogicalDatastoreType store, final InstanceIdentifier path, final T data){ - if (Objects.nonNull(transactionChainManager)) { + if (initialized) { transactionChainManager.writeToTransaction(store, path, data, true); } } @Override - public void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier path) throws TransactionChainClosedException { - if (Objects.nonNull(transactionChainManager)) { + public void addDeleteToTxChain(final LogicalDatastoreType store, final InstanceIdentifier path) { + if (initialized) { transactionChainManager.addDeleteOperationTotTxChain(store, path); } } @Override public boolean submitTransaction() { - return Objects.nonNull(transactionChainManager) && transactionChainManager.submitWriteTransaction(); + return initialized && transactionChainManager.submitWriteTransaction(); } @Override @@ -268,18 +308,20 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void processReply(final OfHeader ofHeader) { - if (ofHeader instanceof Error) { - messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); - } else { - messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); - } + messageSpy.spyMessage( + ofHeader.getImplementedInterface(), + (ofHeader instanceof Error) + ? MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE + : MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); } @Override - public void processReply(final Xid xid, final List ofHeaderList) { - for (final MultipartReply multipartReply : ofHeaderList) { - messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); - } + public void processReply(final Xid xid, final List ofHeaderList) { + ofHeaderList.forEach(header -> messageSpy.spyMessage( + header.getImplementedInterface(), + (header instanceof Error) + ? MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE + : MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS)); } @Override @@ -288,19 +330,19 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification = flowRemovedTranslator.translate(flowRemoved, deviceInfo, null); - if(!deviceManager.getIsNotificationFlowRemovedOff()) { + if(deviceManager.isFlowRemovedNotificationOn()) { // Trigger off a notification notificationPublishService.offerNotification(flowRemovedNotification); } else if(LOG.isDebugEnabled()) { - LOG.debug("For nodeId={} isNotificationFlowRemovedOff={}", getDeviceInfo().getLOGValue(), deviceManager.getIsNotificationFlowRemovedOff()); + LOG.debug("For nodeId={} isFlowRemovedNotificationOn={}", getDeviceInfo().getLOGValue(), deviceManager.isFlowRemovedNotificationOn()); } final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener(); if (itemLifecycleListener != null) { //2. create registry key - final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(flowRemovedNotification); + final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(), flowRemovedNotification); //3. lookup flowId - final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveIdForFlow(flowRegKey); + final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey); //4. if flowId present: if (flowDescriptor != null) { // a) construct flow path @@ -317,6 +359,26 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi } } + @Override + public void sendNodeAddedNotification() { + if (!isAddNotificationSent) { + isAddNotificationSent = true; + NodeUpdatedBuilder builder = new NodeUpdatedBuilder(); + builder.setId(getDeviceInfo().getNodeId()); + builder.setNodeRef(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier())); + LOG.debug("Publishing node added notification for {}", builder.build()); + notificationPublishService.offerNotification(builder.build()); + } + } + + @Override + public void sendNodeRemovedNotification() { + NodeRemovedBuilder builder = new NodeRemovedBuilder(); + builder.setNodeRef(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier())); + LOG.debug("Publishing node removed notification for {}", builder.build()); + notificationPublishService.offerNotification(builder.build()); + } + @Override public void processPortStatusMessage(final PortStatusMessage portStatus) { messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); @@ -343,7 +405,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi private KeyedInstanceIdentifier provideIIToNodeConnector(final long portNo, final short version) { final InstanceIdentifier iiToNodes = getDeviceInfo().getNodeInstanceIdentifier(); final BigInteger dataPathId = getDeviceInfo().getDatapathId(); - final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version); + final NodeConnectorId nodeConnectorId = InventoryDataServiceUtil.nodeConnectorIdfromDatapathPortNo(dataPathId, portNo, OpenflowVersion.get(version)); return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId)); } @@ -427,15 +489,6 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi return translatorLibrary; } - @Override - public synchronized void close() { - LOG.debug("closing deviceContext: {}, nodeId:{}", - getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress(), - getDeviceInfo().getLOGValue()); - // NOOP - throw new UnsupportedOperationException("Autocloseble.close will be removed soon"); - } - @Override public void setCurrentBarrierTimeout(final Timeout timeout) { barrierTaskTimeout = timeout; @@ -459,7 +512,7 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void onPublished() { Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState())); - setState(CONTEXT_STATE.WORKING); + this.state = CONTEXT_STATE.WORKING; primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false); for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) { switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false); @@ -467,8 +520,8 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi } @Override - public MultiMsgCollector getMultiMsgCollector(final RequestContext> requestContext) { - return new MultiMsgCollectorImpl(this, requestContext); + public MultiMsgCollector getMultiMsgCollector(final RequestContext> requestContext) { + return new MultiMsgCollectorImpl<>(this, requestContext); } @Override @@ -505,26 +558,29 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi LOG.debug("ConnectionCtx for Node {} is in RIP state.", getDeviceInfo().getLOGValue()); return; } - /* Terminate Auxiliary Connection */ + + // Terminate Auxiliary Connection for (final ConnectionContext connectionContext : auxiliaryConnectionContexts.values()) { LOG.debug("Closing auxiliary connection {}", connectionContext.getNodeId()); connectionContext.closeConnection(false); } - /* Terminate Primary Connection */ + + // Terminate Primary Connection getPrimaryConnectionContext().closeConnection(true); - /* Close all Group Registry */ - deviceGroupRegistry.close(); - deviceFlowRegistry.close(); - deviceMeterRegistry.close(); + + // Close all datastore registries + if (initialized) { + deviceGroupRegistry.close(); + deviceFlowRegistry.close(); + deviceMeterRegistry.close(); + } } @Override public ListenableFuture shuttingDownDataStoreTransactions() { - ListenableFuture future = Futures.immediateFuture(null); - if (Objects.nonNull(this.transactionChainManager)) { - future = this.transactionChainManager.shuttingDown(); - } - return future; + return initialized + ? this.transactionChainManager.shuttingDown() + : Futures.immediateFuture(null); } @VisibleForTesting @@ -543,17 +599,52 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi } @Override - public void setState(CONTEXT_STATE state) { - this.state = state; - } - - @Override - public ListenableFuture stopClusterServices(boolean deviceDisconnected) { - ListenableFuture future = Futures.immediateFuture(null); - if (Objects.nonNull(this.transactionChainManager)) { - future = this.transactionChainManager.deactivateTransactionManager(); + public ListenableFuture stopClusterServices(boolean connectionInterrupted) { + final ListenableFuture deactivateTxManagerFuture = initialized + ? transactionChainManager.deactivateTransactionManager() + : Futures.immediateFuture(null); + + if (!connectionInterrupted) { + final ListenableFuture makeSlaveFuture = Futures.transform(makeDeviceSlave(), new Function, Void>() { + @Nullable + @Override + public Void apply(@Nullable RpcResult setRoleOutputRpcResult) { + return null; + } + }); + + Futures.addCallback(makeSlaveFuture, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void aVoid) { + if (LOG.isDebugEnabled()) { + LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue()); + } + sendNodeAddedNotification(); + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getLOGValue()); + LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable); + } + }); + + return Futures.transform(deactivateTxManagerFuture, new AsyncFunction() { + @Override + public ListenableFuture apply(Void aVoid) throws Exception { + // Add fallback to remove device from operational DS if setting slave fails + return Futures.withFallback(makeSlaveFuture, t -> + myManager.removeDeviceFromOperationalDS(deviceInfo)); + } + }); + } else { + return Futures.transform(deactivateTxManagerFuture, new AsyncFunction() { + @Override + public ListenableFuture apply(Void aVoid) throws Exception { + return myManager.removeDeviceFromOperationalDS(deviceInfo); + } + }); } - return future; } @Override @@ -566,9 +657,21 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi return this.deviceInfo; } + @Override + public void close() { + if (CONTEXT_STATE.TERMINATION.equals(getState())){ + if (LOG.isDebugEnabled()) { + LOG.debug("DeviceContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue()); + } + } else { + this.state = CONTEXT_STATE.TERMINATION; + } + sendNodeRemovedNotification(); + } + @Override public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){ - if (Objects.nonNull(this.transactionChainManager)) { + if (initialized) { this.transactionChainManager.setLifecycleService(lifecycleService); } } @@ -576,16 +679,26 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi @Override public void replaceConnectionContext(final ConnectionContext connectionContext){ // Act like we are initializing the context - setState(CONTEXT_STATE.INITIALIZATION); + this.state = CONTEXT_STATE.INITIALIZATION; this.primaryConnectionContext = connectionContext; this.onPublished(); } + @Override + public boolean canUseSingleLayerSerialization() { + return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3; + } + @Override public boolean isSkipTableFeatures() { return this.skipTableFeatures; } + @Override + public void setSalRoleService(@Nonnull SalRoleService salRoleService) { + this.salRoleService = salRoleService; + } + @Override public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) { this.clusterInitializationPhaseHandler = handler; @@ -601,31 +714,101 @@ public class DeviceContextImpl implements DeviceContext, ExtensionConverterProvi LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue()); - lazyTransactionManagerInitialiaztion(); + lazyTransactionManagerInitialization(); this.transactionChainManager.activateTransactionManager(); try { - DeviceInitializationUtils.initializeNodeInformation(this, switchFeaturesMandatory, this.convertorExecutor); + final Optional initializer = deviceInitializerProvider + .lookup(deviceInfo.getVersion()); + + if (initializer.isPresent()) { + final MultipartWriterProvider writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this); + initializer.get().initialize(this, switchFeaturesMandatory, writerProvider, convertorExecutor); + } else { + throw new ExecutionException(new ConnectionException("Unsupported version " + deviceInfo.getVersion())); + } } catch (ExecutionException | InterruptedException e) { LOG.warn("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), e); return false; } + Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new RpcResultFutureCallback()); return this.clusterInitializationPhaseHandler.onContextInstantiateService(getPrimaryConnectionContext()); } @VisibleForTesting - void lazyTransactionManagerInitialiaztion() { + void lazyTransactionManagerInitialization() { if (!this.initialized) { if (LOG.isDebugEnabled()) { LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue()); } this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo); - this.deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, deviceInfo.getNodeInstanceIdentifier()); + this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier()); this.deviceGroupRegistry = new DeviceGroupRegistryImpl(); this.deviceMeterRegistry = new DeviceMeterRegistryImpl(); this.initialized = true; } } + + @Nullable + @Override + public RequestContext createRequestContext() { + return new AbstractRequestContext(deviceInfo.reserveXidForDeviceMessage()) { + @Override + public void close() { + } + }; + + } + + ListenableFuture> sendRoleChangeToDevice(final OfpRole newRole) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId()); + } + + final Future> setRoleOutputFuture; + + if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) { + final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole) + .setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build(); + + setRoleOutputFuture = this.salRoleService.setRole(setRoleInput); + + final TimerTask timerTask = timeout -> { + if (!setRoleOutputFuture.isDone()) { + LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT); + setRoleOutputFuture.cancel(true); + } + }; + + hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS); + } else { + LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion()); + return Futures.immediateFuture(null); + } + + return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture); + } + + @Override + public ListenableFuture> makeDeviceSlave() { + return sendRoleChangeToDevice(OfpRole.BECOMESLAVE); + } + + private class RpcResultFutureCallback implements FutureCallback> { + @Override + public void onSuccess(@Nullable RpcResult setRoleOutputRpcResult) { + if (LOG.isDebugEnabled()) { + LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue()); + } + sendNodeAddedNotification(); + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue()); + shutdownConnection(); + } + } }