package org.opendaylight.openflowplugin.impl.device;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
+import com.google.common.base.Optional;
import com.google.common.base.Verify;
-import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.math.BigInteger;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
+import org.opendaylight.openflowplugin.api.ConnectionException;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
-import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
+import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
+import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
+import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
+import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
-import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
+import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemoved;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper{
+public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
// Timeout in seconds after what we will give up on propagating role
private static final int SET_ROLE_TIMEOUT = 10;
+ // Timeout in milliseconds after what we will give up on initializing device
+ private static final int DEVICE_INIT_TIMEOUT = 9000;
+
+ private static final int LOW_WATERMARK = 1000;
+ private static final int HIGH_WATERMARK = 2000;
+ private final MultipartWriterProvider writerProvider;
+
private boolean initialized;
private SalRoleService salRoleService = null;
private final HashedWheelTimer hashedWheelTimer;
- private ConnectionContext primaryConnectionContext;
+ private volatile ConnectionContext primaryConnectionContext;
private final DeviceState deviceState;
private final DataBroker dataBroker;
private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
private DeviceFlowRegistry deviceFlowRegistry;
private DeviceGroupRegistry deviceGroupRegistry;
private DeviceMeterRegistry deviceMeterRegistry;
- private final PacketInRateLimiter packetInLimiter;
+ private PacketInRateLimiter packetInLimiter;
private final MessageSpy messageSpy;
private final ItemLifeCycleKeeper flowLifeCycleKeeper;
private NotificationPublishService notificationPublishService;
private final TranslatorLibrary translatorLibrary;
private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
private ExtensionConverterProvider extensionConverterProvider;
- private final DeviceManager deviceManager;
private boolean skipTableFeatures;
private boolean switchFeaturesMandatory;
- private final DeviceInfo deviceInfo;
+ private DeviceInfo deviceInfo;
private final ConvertorExecutor convertorExecutor;
- private volatile CONTEXT_STATE state;
+ private volatile ContextState state;
private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
private final DeviceManager myManager;
+ private final DeviceInitializerProvider deviceInitializerProvider;
+ private final boolean useSingleLayerSerialization;
+ private boolean hasState;
+ private boolean isInitialTransactionSubmitted;
DeviceContextImpl(
@Nonnull final ConnectionContext primaryConnectionContext,
@Nonnull final DataBroker dataBroker,
@Nonnull final MessageSpy messageSpy,
@Nonnull final TranslatorLibrary translatorLibrary,
- @Nonnull final DeviceManager manager,
+ @Nonnull final DeviceManager contextManager,
final ConvertorExecutor convertorExecutor,
final boolean skipTableFeatures,
final HashedWheelTimer hashedWheelTimer,
- final DeviceManager myManager) {
+ final boolean useSingleLayerSerialization,
+ final DeviceInitializerProvider deviceInitializerProvider) {
+
this.primaryConnectionContext = primaryConnectionContext;
this.deviceInfo = primaryConnectionContext.getDeviceInfo();
this.hashedWheelTimer = hashedWheelTimer;
- this.myManager = myManager;
+ this.deviceInitializerProvider = deviceInitializerProvider;
+ this.myManager = contextManager;
this.deviceState = new DeviceStateImpl();
this.dataBroker = dataBroker;
this.auxiliaryConnectionContexts = new HashMap<>();
- this.messageSpy = Preconditions.checkNotNull(messageSpy);
- this.deviceManager = manager;
+ this.messageSpy = messageSpy;
this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
- /*initial*/ 1000, /*initial*/2000, this.messageSpy, REJECTED_DRAIN_FACTOR);
+ /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
this.translatorLibrary = translatorLibrary;
this.portStatusTranslator = translatorLibrary.lookupTranslator(
this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
- this.state = CONTEXT_STATE.INITIALIZATION;
+ this.state = ContextState.INITIALIZATION;
this.convertorExecutor = convertorExecutor;
this.skipTableFeatures = skipTableFeatures;
+ this.useSingleLayerSerialization = useSingleLayerSerialization;
this.initialized = false;
+ writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
}
@Override
- public void initialSubmitTransaction() {
- if (initialized) {
- transactionChainManager.initialSubmitWriteTransaction();
- }
+ public boolean initialSubmitTransaction() {
+ return (initialized &&(isInitialTransactionSubmitted =
+ transactionChainManager.initialSubmitWriteTransaction()));
}
@Override
return dataBroker.newReadOnlyTransaction();
}
+ @Override
+ public boolean isTransactionsEnabled() {
+ return isInitialTransactionSubmitted;
+ }
+
@Override
public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
final InstanceIdentifier<T> path,
@Override
public void processReply(final OfHeader ofHeader) {
- if (ofHeader instanceof Error) {
- messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
- } else {
- messageSpy.spyMessage(ofHeader.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
- }
+ messageSpy.spyMessage(
+ ofHeader.getImplementedInterface(),
+ (ofHeader instanceof Error)
+ ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
+ : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
}
@Override
- public void processReply(final Xid xid, final List<MultipartReply> ofHeaderList) {
- for (final MultipartReply multipartReply : ofHeaderList) {
- messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
- }
+ public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
+ ofHeaderList.forEach(header -> messageSpy.spyMessage(
+ header.getImplementedInterface(),
+ (header instanceof Error)
+ ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
+ : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
}
@Override
final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
- if(!deviceManager.getIsNotificationFlowRemovedOff()) {
+ if(myManager.isFlowRemovedNotificationOn()) {
// Trigger off a notification
notificationPublishService.offerNotification(flowRemovedNotification);
} else if(LOG.isDebugEnabled()) {
- LOG.debug("For nodeId={} isNotificationFlowRemovedOff={}", getDeviceInfo().getLOGValue(), deviceManager.getIsNotificationFlowRemovedOff());
+ LOG.debug("For nodeId={} isNotificationFlowRemovedOn={}", getDeviceInfo().getLOGValue(), myManager.isFlowRemovedNotificationOn());
}
final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
if (itemLifecycleListener != null) {
//2. create registry key
- final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(flowRemovedNotification);
+ final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(), flowRemovedNotification);
//3. lookup flowId
- final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveIdForFlow(flowRegKey);
+ final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
//4. if flowId present:
if (flowDescriptor != null) {
// a) construct flow path
@Override
public void processPortStatusMessage(final PortStatusMessage portStatus) {
- messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
- final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator.translate(portStatus, getDeviceInfo(), null);
+ messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
- final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
- try {
- if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
- // because of ADD status node connector has to be created
- final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
- nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
- nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
- writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
- } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
- addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
+ if (initialized) {
+ try {
+ writePortStatusMessage(portStatus);
+ submitTransaction();
+ } catch (final Exception e) {
+ LOG.warn("Error processing port status message for port {} on device {}",
+ portStatus.getPortNo(), getDeviceInfo().getLOGValue(), e);
}
- submitTransaction();
- } catch (final Exception e) {
- LOG.warn("Error processing port status message for port {} on device {} : {}", portStatus.getPortNo(),
- getDeviceInfo().getNodeId().toString(), e);
+ } else if (!hasState) {
+ primaryConnectionContext.handlePortStatusMessage(portStatus);
}
}
- private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
- final InstanceIdentifier<Node> iiToNodes = getDeviceInfo().getNodeInstanceIdentifier();
- final BigInteger dataPathId = getDeviceInfo().getDatapathId();
- final NodeConnectorId nodeConnectorId = NodeStaticReplyTranslatorUtil.nodeConnectorId(dataPathId.toString(), portNo, version);
- return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
+ private void writePortStatusMessage(final PortStatus portStatusMessage) {
+ final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
+ .translate(portStatusMessage, getDeviceInfo(), null);
+
+ final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
+ .getNodeInstanceIdentifier()
+ .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
+ .nodeConnectorIdfromDatapathPortNo(
+ deviceInfo.getDatapathId(),
+ portStatusMessage.getPortNo(),
+ OpenflowVersion.get(deviceInfo.getVersion()))));
+
+ if (PortReason.OFPPRADD.equals(portStatusMessage.getReason()) || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
+ // because of ADD status node connector has to be created
+ writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
+ .setKey(iiToNodeConnector.getKey())
+ .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build())
+ .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
+ .build());
+ } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
+ addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
+ }
}
@Override
public void processPacketInMessage(final PacketInMessage packetInMessage) {
- messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
+ messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH);
final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
if (packetReceived == null) {
LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
- messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+ messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
return;
} else {
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
}
if (!packetInLimiter.acquirePermit()) {
LOG.debug("Packet limited");
// TODO: save packet into emergency slot if possible
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
return;
}
final ListenableFuture<?> offerNotification = notificationPublishService.offerNotification(packetReceived);
if (NotificationPublishService.REJECTED.equals(offerNotification)) {
LOG.debug("notification offer rejected");
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
packetInLimiter.drainLowWaterMark();
packetInLimiter.releasePermit();
return;
Futures.addCallback(offerNotification, new FutureCallback<Object>() {
@Override
public void onSuccess(final Object result) {
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
packetInLimiter.releasePermit();
}
@Override
public void onFailure(final Throwable t) {
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
LOG.debug("notification offer failed: {}", t.getMessage());
LOG.trace("notification offer failed..", t);
packetInLimiter.releasePermit();
try {
messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder()
- .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
+ .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
.setExperimenterMessageOfChoice(messageOfChoice);
// publish
notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
return translatorLibrary;
}
- @Override
- public synchronized void close() {
- LOG.debug("closing deviceContext: {}, nodeId:{}",
- getPrimaryConnectionContext().getConnectionAdapter().getRemoteAddress(),
- getDeviceInfo().getLOGValue());
- // NOOP
- throw new UnsupportedOperationException("Autocloseble.close will be removed soon");
- }
-
@Override
public void setCurrentBarrierTimeout(final Timeout timeout) {
barrierTaskTimeout = timeout;
@Override
public void onPublished() {
- Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState()));
- setState(CONTEXT_STATE.WORKING);
+ Verify.verify(ContextState.INITIALIZATION.equals(getState()));
+ this.state = ContextState.WORKING;
primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
- for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) {
- switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
- }
}
@Override
- public MultiMsgCollector getMultiMsgCollector(final RequestContext<List<MultipartReply>> requestContext) {
- return new MultiMsgCollectorImpl(this, requestContext);
+ public <T extends OfHeader> MultiMsgCollector<T> getMultiMsgCollector(final RequestContext<List<T>> requestContext) {
+ return new MultiMsgCollectorImpl<>(this, requestContext);
}
@Override
if (LOG.isDebugEnabled()) {
LOG.debug("Shutdown method for node {}", getDeviceInfo().getLOGValue());
}
- if (CONTEXT_STATE.TERMINATION.equals(getState())) {
+ if (ContextState.TERMINATION.equals(getState())) {
LOG.debug("DeviceCtx for Node {} is in termination process.", getDeviceInfo().getLOGValue());
return;
}
}
@Override
- public CONTEXT_STATE getState() {
+ public ContextState getState() {
return this.state;
}
@Override
- public void setState(CONTEXT_STATE state) {
- this.state = state;
- }
-
- @Override
- public ListenableFuture<Void> stopClusterServices(boolean deviceDisconnected) {
-
- ListenableFuture<Void> deactivateTxManagerFuture =
- initialized ? transactionChainManager.deactivateTransactionManager() : Futures.immediateFuture(null);
-
- if (!deviceDisconnected) {
- ListenableFuture<Void> makeSlaveFuture = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
- @Nullable
- @Override
- public Void apply(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
- return null;
- }
- });
-
- Futures.addCallback(makeSlaveFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(@Nullable Void aVoid) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
- }
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getLOGValue());
- LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable);
- myManager.removeDeviceFromOperationalDS(deviceInfo);
- }
- });
-
- return Futures.transform(deactivateTxManagerFuture, new AsyncFunction<Void, Void>() {
- @Override
- public ListenableFuture<Void> apply(Void aVoid) throws Exception {
- return makeSlaveFuture;
- }
- });
- } else {
- return Futures.transform(deactivateTxManagerFuture, new AsyncFunction<Void, Void>() {
- @Override
- public ListenableFuture<Void> apply(Void aVoid) throws Exception {
- return myManager.removeDeviceFromOperationalDS(deviceInfo);
- }
- });
- }
+ public ListenableFuture<Void> stopClusterServices() {
+ return initialized
+ ? transactionChainManager.deactivateTransactionManager()
+ : Futures.immediateFuture(null);
}
@Override
}
@Override
- public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){
- if (initialized) {
- this.transactionChainManager.setLifecycleService(lifecycleService);
- }
+ public void close() {
+ //NOOP
}
@Override
- public void replaceConnectionContext(final ConnectionContext connectionContext){
- // Act like we are initializing the context
- setState(CONTEXT_STATE.INITIALIZATION);
- this.primaryConnectionContext = connectionContext;
- this.onPublished();
+ public boolean canUseSingleLayerSerialization() {
+ return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
}
@Override
}
@Override
- public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
-
- if (getPrimaryConnectionContext().getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
- LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
- return false;
- }
+ public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
-
lazyTransactionManagerInitialization();
- this.transactionChainManager.activateTransactionManager();
+ try {
+ final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
+ .retrieveAndClearPortStatusMessages();
+
+ portStatusMessages.forEach(this::writePortStatusMessage);
+ submitTransaction();
+ } catch (final Exception ex) {
+ LOG.warn("Error processing port status messages from device {}", getDeviceInfo().getLOGValue(), ex);
+ return false;
+ }
try {
- DeviceInitializationUtils.initializeNodeInformation(this, switchFeaturesMandatory, this.convertorExecutor);
- } catch (ExecutionException | InterruptedException e) {
- LOG.warn("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), e);
+ final java.util.Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
+ .lookup(deviceInfo.getVersion());
+
+ if (initializer.isPresent()) {
+ initializer
+ .get()
+ .initialize(this, switchFeaturesMandatory, writerProvider, convertorExecutor)
+ .get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
+ } else {
+ throw new ExecutionException(new ConnectionException("Unsupported version " + deviceInfo.getVersion()));
+ }
+ } catch (ExecutionException | InterruptedException | TimeoutException ex) {
+ LOG.warn("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), ex);
return false;
}
- Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new RpcResultFutureCallback());
+ Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER),
+ new RpcResultFutureCallback(mastershipChangeListener));
+
+ final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill();
+ Futures.addCallback(deviceFlowRegistryFill,
+ new DeviceFlowRegistryCallback(deviceFlowRegistryFill, mastershipChangeListener));
- return this.clusterInitializationPhaseHandler.onContextInstantiateService(getPrimaryConnectionContext());
+ return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
}
@VisibleForTesting
LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue());
}
this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
- this.deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, deviceInfo.getNodeInstanceIdentifier());
+ this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier());
this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
+ this.transactionChainManager.activateTransactionManager();
this.initialized = true;
}
}
}
- ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
+ private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
}
+
final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
+
if (deviceInfo.getVersion() >= OFConstants.OFP_VERSION_1_3) {
final SetRoleInput setRoleInput = (new SetRoleInputBuilder()).setControllerRole(newRole)
.setNode(new NodeRef(deviceInfo.getNodeInstanceIdentifier())).build();
+
setRoleOutputFuture = this.salRoleService.setRole(setRoleInput);
+
final TimerTask timerTask = timeout -> {
if (!setRoleOutputFuture.isDone()) {
LOG.warn("New role {} was not propagated to device {} during {} sec", newRole, deviceInfo.getLOGValue(), SET_ROLE_TIMEOUT);
setRoleOutputFuture.cancel(true);
}
};
+
hashedWheelTimer.newTimeout(timerTask, SET_ROLE_TIMEOUT, TimeUnit.SECONDS);
} else {
LOG.info("Device: {} with version: {} does not support role", deviceInfo.getLOGValue(), deviceInfo.getVersion());
return Futures.immediateFuture(null);
}
+
return JdkFutureAdapters.listenInPoolThread(setRoleOutputFuture);
}
return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
}
+ @Override
+ public void onStateAcquired(final ContextChainState state) {
+ hasState = true;
+ }
+
private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
+
+ private final MastershipChangeListener mastershipChangeListener;
+
+ RpcResultFutureCallback(final MastershipChangeListener mastershipChangeListener) {
+ this.mastershipChangeListener = mastershipChangeListener;
+ }
+
@Override
public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+ this.mastershipChangeListener.onMasterRoleAcquired(
+ deviceInfo,
+ ContextChainMastershipState.MASTER_ON_DEVICE
+ );
if (LOG.isDebugEnabled()) {
LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue());
}
@Override
public void onFailure(final Throwable throwable) {
- LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue());
- shutdownConnection();
+ mastershipChangeListener.onNotAbleToStartMastershipMandatory(
+ deviceInfo,
+ "Was not able to set MASTER role on device");
}
}
+
+ private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
+ private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
+ private final MastershipChangeListener mastershipChangeListener;
+
+ DeviceFlowRegistryCallback(
+ ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
+ MastershipChangeListener mastershipChangeListener) {
+ this.deviceFlowRegistryFill = deviceFlowRegistryFill;
+ this.mastershipChangeListener = mastershipChangeListener;
+ }
+
+ @Override
+ public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
+ if (LOG.isDebugEnabled()) {
+ // Count all flows we read from datastore for debugging purposes.
+ // This number do not always represent how many flows were actually added
+ // to DeviceFlowRegistry, because of possible duplicates.
+ long flowCount = Optional.fromNullable(result).asSet().stream()
+ .flatMap(Collection::stream)
+ .filter(Objects::nonNull)
+ .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
+ .filter(Objects::nonNull)
+ .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
+ .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+ .filter(Objects::nonNull)
+ .filter(table -> Objects.nonNull(table.getFlow()))
+ .flatMap(table -> table.getFlow().stream())
+ .filter(Objects::nonNull)
+ .count();
+
+ LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getLOGValue());
+ }
+ this.mastershipChangeListener.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_FLOW_REGISTRY_FILL);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (deviceFlowRegistryFill.isCancelled()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
+ }
+ } else {
+ LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getLOGValue(), t);
+ }
+ mastershipChangeListener.onNotAbleToStartMastership(
+ deviceInfo,
+ "Was not able to fill flow registry on device",
+ false);
+ }
+ }
+
}