package org.opendaylight.openflowplugin.impl.device;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
+import com.google.common.base.Optional;
import com.google.common.base.Verify;
-import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
+import java.math.BigInteger;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdatedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.Error;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketIn;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.experimenter.core.ExperimenterDataOfChoice;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.experimenter.types.rev151020.experimenter.core.message.ExperimenterMessageOfChoice;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
public class DeviceContextImpl implements DeviceContext, ExtensionConverterProviderKeeper {
private static final Logger LOG = LoggerFactory.getLogger(DeviceContextImpl.class);
// Timeout in seconds after what we will give up on propagating role
private static final int SET_ROLE_TIMEOUT = 10;
+ // Timeout in milliseconds after what we will give up on initializing device
+ private static final int DEVICE_INIT_TIMEOUT = 9000;
+
+ private static final int LOW_WATERMARK = 1000;
+ private static final int HIGH_WATERMARK = 2000;
+ private final MultipartWriterProvider writerProvider;
+
private boolean initialized;
private SalRoleService salRoleService = null;
private final HashedWheelTimer hashedWheelTimer;
- private ConnectionContext primaryConnectionContext;
+ private volatile ConnectionContext primaryConnectionContext;
private final DeviceState deviceState;
private final DataBroker dataBroker;
private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
private DeviceFlowRegistry deviceFlowRegistry;
private DeviceGroupRegistry deviceGroupRegistry;
private DeviceMeterRegistry deviceMeterRegistry;
- private final PacketInRateLimiter packetInLimiter;
+ private PacketInRateLimiter packetInLimiter;
private final MessageSpy messageSpy;
private final ItemLifeCycleKeeper flowLifeCycleKeeper;
private NotificationPublishService notificationPublishService;
private final TranslatorLibrary translatorLibrary;
private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
private ExtensionConverterProvider extensionConverterProvider;
- private final DeviceManager deviceManager;
private boolean skipTableFeatures;
private boolean switchFeaturesMandatory;
- private final DeviceInfo deviceInfo;
+ private DeviceInfo deviceInfo;
private final ConvertorExecutor convertorExecutor;
- private volatile CONTEXT_STATE state;
+ private volatile ContextState state;
private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
private final DeviceManager myManager;
private final DeviceInitializerProvider deviceInitializerProvider;
private final boolean useSingleLayerSerialization;
- private Boolean isAddNotificationSent = false;
+ private boolean hasState;
+ private boolean isInitialTransactionSubmitted;
DeviceContextImpl(
- @Nonnull final ConnectionContext primaryConnectionContext,
- @Nonnull final DataBroker dataBroker,
- @Nonnull final MessageSpy messageSpy,
- @Nonnull final TranslatorLibrary translatorLibrary,
- @Nonnull final DeviceManager manager,
- final ConvertorExecutor convertorExecutor,
- final boolean skipTableFeatures,
- final HashedWheelTimer hashedWheelTimer,
- final DeviceManager myManager,
- final boolean useSingleLayerSerialization,
- final DeviceInitializerProvider deviceInitializerProvider) {
+ @Nonnull final ConnectionContext primaryConnectionContext,
+ @Nonnull final DataBroker dataBroker,
+ @Nonnull final MessageSpy messageSpy,
+ @Nonnull final TranslatorLibrary translatorLibrary,
+ @Nonnull final DeviceManager contextManager,
+ final ConvertorExecutor convertorExecutor,
+ final boolean skipTableFeatures,
+ final HashedWheelTimer hashedWheelTimer,
+ final boolean useSingleLayerSerialization,
+ final DeviceInitializerProvider deviceInitializerProvider) {
this.primaryConnectionContext = primaryConnectionContext;
this.deviceInfo = primaryConnectionContext.getDeviceInfo();
this.hashedWheelTimer = hashedWheelTimer;
- this.myManager = myManager;
this.deviceInitializerProvider = deviceInitializerProvider;
+ this.myManager = contextManager;
this.deviceState = new DeviceStateImpl();
this.dataBroker = dataBroker;
this.auxiliaryConnectionContexts = new HashMap<>();
- this.messageSpy = Preconditions.checkNotNull(messageSpy);
- this.deviceManager = manager;
+ this.messageSpy = messageSpy;
this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
- /*initial*/ 1000, /*initial*/2000, this.messageSpy, REJECTED_DRAIN_FACTOR);
+ /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
this.translatorLibrary = translatorLibrary;
this.portStatusTranslator = translatorLibrary.lookupTranslator(
this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
- this.state = CONTEXT_STATE.INITIALIZATION;
+ this.state = ContextState.INITIALIZATION;
this.convertorExecutor = convertorExecutor;
this.skipTableFeatures = skipTableFeatures;
this.useSingleLayerSerialization = useSingleLayerSerialization;
this.initialized = false;
+ writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
}
@Override
- public void initialSubmitTransaction() {
- if (initialized) {
- transactionChainManager.initialSubmitWriteTransaction();
- }
+ public boolean initialSubmitTransaction() {
+ return (initialized &&(isInitialTransactionSubmitted =
+ transactionChainManager.initialSubmitWriteTransaction()));
}
@Override
return dataBroker.newReadOnlyTransaction();
}
+ @Override
+ public boolean isTransactionsEnabled() {
+ return isInitialTransactionSubmitted;
+ }
+
@Override
public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
final InstanceIdentifier<T> path,
@Override
public void processReply(final OfHeader ofHeader) {
messageSpy.spyMessage(
- ofHeader.getImplementedInterface(),
- (ofHeader instanceof Error)
- ? MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE
- : MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+ ofHeader.getImplementedInterface(),
+ (ofHeader instanceof Error)
+ ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
+ : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
}
@Override
public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
ofHeaderList.forEach(header -> messageSpy.spyMessage(
- header.getImplementedInterface(),
- (header instanceof Error)
- ? MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE
- : MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS));
+ header.getImplementedInterface(),
+ (header instanceof Error)
+ ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
+ : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
}
@Override
final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
- if(deviceManager.isFlowRemovedNotificationOn()) {
+ if(myManager.isFlowRemovedNotificationOn()) {
// Trigger off a notification
notificationPublishService.offerNotification(flowRemovedNotification);
} else if(LOG.isDebugEnabled()) {
- LOG.debug("For nodeId={} isFlowRemovedNotificationOn={}", getDeviceInfo().getLOGValue(), deviceManager.isFlowRemovedNotificationOn());
+ LOG.debug("For nodeId={} isNotificationFlowRemovedOn={}", getDeviceInfo().getLOGValue(), myManager.isFlowRemovedNotificationOn());
}
final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
}
}
- @Override
- public void sendNodeAddedNotification() {
- if (!isAddNotificationSent) {
- isAddNotificationSent = true;
- NodeUpdatedBuilder builder = new NodeUpdatedBuilder();
- builder.setId(getDeviceInfo().getNodeId());
- builder.setNodeRef(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()));
- LOG.debug("Publishing node added notification for {}", builder.build());
- notificationPublishService.offerNotification(builder.build());
- }
- }
-
- @Override
- public void sendNodeRemovedNotification() {
- NodeRemovedBuilder builder = new NodeRemovedBuilder();
- builder.setNodeRef(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()));
- LOG.debug("Publishing node removed notification for {}", builder.build());
- notificationPublishService.offerNotification(builder.build());
- }
-
@Override
public void processPortStatusMessage(final PortStatusMessage portStatus) {
- messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
- final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator.translate(portStatus, getDeviceInfo(), null);
+ messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
- final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = provideIIToNodeConnector(portStatus.getPortNo(), portStatus.getVersion());
- try {
- if (portStatus.getReason().equals(PortReason.OFPPRADD) || portStatus.getReason().equals(PortReason.OFPPRMODIFY)) {
- // because of ADD status node connector has to be created
- final NodeConnectorBuilder nConnectorBuilder = new NodeConnectorBuilder().setKey(iiToNodeConnector.getKey());
- nConnectorBuilder.addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build());
- nConnectorBuilder.addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector);
- writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, nConnectorBuilder.build());
- } else if (portStatus.getReason().equals(PortReason.OFPPRDELETE)) {
- addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
+ if (initialized) {
+ try {
+ writePortStatusMessage(portStatus);
+ submitTransaction();
+ } catch (final Exception e) {
+ LOG.warn("Error processing port status message for port {} on device {}",
+ portStatus.getPortNo(), getDeviceInfo().getLOGValue(), e);
}
- submitTransaction();
- } catch (final Exception e) {
- LOG.warn("Error processing port status message for port {} on device {} : {}", portStatus.getPortNo(),
- getDeviceInfo().getNodeId().toString(), e);
+ } else if (!hasState) {
+ primaryConnectionContext.handlePortStatusMessage(portStatus);
}
}
- private KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> provideIIToNodeConnector(final long portNo, final short version) {
- final InstanceIdentifier<Node> iiToNodes = getDeviceInfo().getNodeInstanceIdentifier();
- final BigInteger dataPathId = getDeviceInfo().getDatapathId();
- final NodeConnectorId nodeConnectorId = InventoryDataServiceUtil.nodeConnectorIdfromDatapathPortNo(dataPathId, portNo, OpenflowVersion.get(version));
- return iiToNodes.child(NodeConnector.class, new NodeConnectorKey(nodeConnectorId));
+ private void writePortStatusMessage(final PortStatus portStatusMessage) {
+ final FlowCapableNodeConnector flowCapableNodeConnector = portStatusTranslator
+ .translate(portStatusMessage, getDeviceInfo(), null);
+
+ final KeyedInstanceIdentifier<NodeConnector, NodeConnectorKey> iiToNodeConnector = getDeviceInfo()
+ .getNodeInstanceIdentifier()
+ .child(NodeConnector.class, new NodeConnectorKey(InventoryDataServiceUtil
+ .nodeConnectorIdfromDatapathPortNo(
+ deviceInfo.getDatapathId(),
+ portStatusMessage.getPortNo(),
+ OpenflowVersion.get(deviceInfo.getVersion()))));
+
+ if (PortReason.OFPPRADD.equals(portStatusMessage.getReason()) || PortReason.OFPPRMODIFY.equals(portStatusMessage.getReason())) {
+ // because of ADD status node connector has to be created
+ writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector, new NodeConnectorBuilder()
+ .setKey(iiToNodeConnector.getKey())
+ .addAugmentation(FlowCapableNodeConnectorStatisticsData.class, new FlowCapableNodeConnectorStatisticsDataBuilder().build())
+ .addAugmentation(FlowCapableNodeConnector.class, flowCapableNodeConnector)
+ .build());
+ } else if (PortReason.OFPPRDELETE.equals(portStatusMessage.getReason())) {
+ addDeleteToTxChain(LogicalDatastoreType.OPERATIONAL, iiToNodeConnector);
+ }
}
@Override
public void processPacketInMessage(final PacketInMessage packetInMessage) {
- messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
+ messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH);
final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
if (packetReceived == null) {
LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
- messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+ messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
return;
} else {
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
}
if (!packetInLimiter.acquirePermit()) {
LOG.debug("Packet limited");
// TODO: save packet into emergency slot if possible
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
return;
}
final ListenableFuture<?> offerNotification = notificationPublishService.offerNotification(packetReceived);
if (NotificationPublishService.REJECTED.equals(offerNotification)) {
LOG.debug("notification offer rejected");
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
packetInLimiter.drainLowWaterMark();
packetInLimiter.releasePermit();
return;
Futures.addCallback(offerNotification, new FutureCallback<Object>() {
@Override
public void onSuccess(final Object result) {
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
packetInLimiter.releasePermit();
}
@Override
public void onFailure(final Throwable t) {
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
LOG.debug("notification offer failed: {}", t.getMessage());
LOG.trace("notification offer failed..", t);
packetInLimiter.releasePermit();
try {
messageOfChoice = messageConverter.convert(vendorData, MessagePath.MESSAGE_NOTIFICATION);
final ExperimenterMessageFromDevBuilder experimenterMessageFromDevBld = new ExperimenterMessageFromDevBuilder()
- .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
+ .setNode(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()))
.setExperimenterMessageOfChoice(messageOfChoice);
// publish
notificationPublishService.offerNotification(experimenterMessageFromDevBld.build());
@Override
public void onPublished() {
- Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState()));
- this.state = CONTEXT_STATE.WORKING;
+ Verify.verify(ContextState.INITIALIZATION.equals(getState()));
+ this.state = ContextState.WORKING;
primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
- for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) {
- switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
- }
}
@Override
if (LOG.isDebugEnabled()) {
LOG.debug("Shutdown method for node {}", getDeviceInfo().getLOGValue());
}
- if (CONTEXT_STATE.TERMINATION.equals(getState())) {
+ if (ContextState.TERMINATION.equals(getState())) {
LOG.debug("DeviceCtx for Node {} is in termination process.", getDeviceInfo().getLOGValue());
return;
}
}
@Override
- public CONTEXT_STATE getState() {
+ public ContextState getState() {
return this.state;
}
@Override
- public ListenableFuture<Void> stopClusterServices(boolean connectionInterrupted) {
- final ListenableFuture<Void> deactivateTxManagerFuture = initialized
+ public ListenableFuture<Void> stopClusterServices() {
+ return initialized
? transactionChainManager.deactivateTransactionManager()
: Futures.immediateFuture(null);
-
- if (!connectionInterrupted) {
- final ListenableFuture<Void> makeSlaveFuture = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
- @Nullable
- @Override
- public Void apply(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
- return null;
- }
- });
-
- Futures.addCallback(makeSlaveFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(@Nullable Void aVoid) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
- }
- sendNodeAddedNotification();
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getLOGValue());
- LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable);
- }
- });
-
- return Futures.transform(deactivateTxManagerFuture, new AsyncFunction<Void, Void>() {
- @Override
- public ListenableFuture<Void> apply(Void aVoid) throws Exception {
- // Add fallback to remove device from operational DS if setting slave fails
- return Futures.withFallback(makeSlaveFuture, t ->
- myManager.removeDeviceFromOperationalDS(deviceInfo));
- }
- });
- } else {
- return Futures.transform(deactivateTxManagerFuture, new AsyncFunction<Void, Void>() {
- @Override
- public ListenableFuture<Void> apply(Void aVoid) throws Exception {
- return myManager.removeDeviceFromOperationalDS(deviceInfo);
- }
- });
- }
}
@Override
@Override
public void close() {
- if (CONTEXT_STATE.TERMINATION.equals(getState())){
- if (LOG.isDebugEnabled()) {
- LOG.debug("DeviceContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
- }
- } else {
- this.state = CONTEXT_STATE.TERMINATION;
- }
- sendNodeRemovedNotification();
- }
-
- @Override
- public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){
- if (initialized) {
- this.transactionChainManager.setLifecycleService(lifecycleService);
- }
- }
-
- @Override
- public void replaceConnectionContext(final ConnectionContext connectionContext){
- // Act like we are initializing the context
- this.state = CONTEXT_STATE.INITIALIZATION;
- this.primaryConnectionContext = connectionContext;
- this.onPublished();
+ //NOOP
}
@Override
}
@Override
- public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
-
- if (getPrimaryConnectionContext().getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
- LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
- return false;
- }
+ public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
-
lazyTransactionManagerInitialization();
- this.transactionChainManager.activateTransactionManager();
+ try {
+ final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
+ .retrieveAndClearPortStatusMessages();
+
+ portStatusMessages.forEach(this::writePortStatusMessage);
+ submitTransaction();
+ } catch (final Exception ex) {
+ LOG.warn("Error processing port status messages from device {}", getDeviceInfo().getLOGValue(), ex);
+ return false;
+ }
try {
- final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
- .lookup(deviceInfo.getVersion());
+ final java.util.Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
+ .lookup(deviceInfo.getVersion());
if (initializer.isPresent()) {
- final MultipartWriterProvider writerProvider = MultipartWriterProviderFactory.createDefaultProvider(this);
- initializer.get().initialize(this, switchFeaturesMandatory, writerProvider, convertorExecutor);
+ initializer
+ .get()
+ .initialize(this, switchFeaturesMandatory, writerProvider, convertorExecutor)
+ .get(DEVICE_INIT_TIMEOUT, TimeUnit.MILLISECONDS);
} else {
throw new ExecutionException(new ConnectionException("Unsupported version " + deviceInfo.getVersion()));
}
- } catch (ExecutionException | InterruptedException e) {
- LOG.warn("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), e);
+ } catch (ExecutionException | InterruptedException | TimeoutException ex) {
+ LOG.warn("Device {} cannot be initialized: ", deviceInfo.getLOGValue(), ex);
return false;
}
- Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new RpcResultFutureCallback());
- return this.clusterInitializationPhaseHandler.onContextInstantiateService(getPrimaryConnectionContext());
+ Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER),
+ new RpcResultFutureCallback(mastershipChangeListener));
+
+ final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill();
+ Futures.addCallback(deviceFlowRegistryFill,
+ new DeviceFlowRegistryCallback(deviceFlowRegistryFill, mastershipChangeListener));
+
+ return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
}
@VisibleForTesting
this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier());
this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
+ this.transactionChainManager.activateTransactionManager();
this.initialized = true;
}
}
}
- ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
+ private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
}
return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
}
+ @Override
+ public void onStateAcquired(final ContextChainState state) {
+ hasState = true;
+ }
+
private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
+
+ private final MastershipChangeListener mastershipChangeListener;
+
+ RpcResultFutureCallback(final MastershipChangeListener mastershipChangeListener) {
+ this.mastershipChangeListener = mastershipChangeListener;
+ }
+
@Override
public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+ this.mastershipChangeListener.onMasterRoleAcquired(
+ deviceInfo,
+ ContextChainMastershipState.MASTER_ON_DEVICE
+ );
if (LOG.isDebugEnabled()) {
LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue());
}
- sendNodeAddedNotification();
}
@Override
public void onFailure(final Throwable throwable) {
- LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue());
- shutdownConnection();
+ mastershipChangeListener.onNotAbleToStartMastershipMandatory(
+ deviceInfo,
+ "Was not able to set MASTER role on device");
}
}
+
+ private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
+ private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
+ private final MastershipChangeListener mastershipChangeListener;
+
+ DeviceFlowRegistryCallback(
+ ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill,
+ MastershipChangeListener mastershipChangeListener) {
+ this.deviceFlowRegistryFill = deviceFlowRegistryFill;
+ this.mastershipChangeListener = mastershipChangeListener;
+ }
+
+ @Override
+ public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
+ if (LOG.isDebugEnabled()) {
+ // Count all flows we read from datastore for debugging purposes.
+ // This number do not always represent how many flows were actually added
+ // to DeviceFlowRegistry, because of possible duplicates.
+ long flowCount = Optional.fromNullable(result).asSet().stream()
+ .flatMap(Collection::stream)
+ .filter(Objects::nonNull)
+ .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
+ .filter(Objects::nonNull)
+ .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
+ .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+ .filter(Objects::nonNull)
+ .filter(table -> Objects.nonNull(table.getFlow()))
+ .flatMap(table -> table.getFlow().stream())
+ .filter(Objects::nonNull)
+ .count();
+
+ LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getLOGValue());
+ }
+ this.mastershipChangeListener.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState.INITIAL_FLOW_REGISTRY_FILL);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (deviceFlowRegistryFill.isCancelled()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
+ }
+ } else {
+ LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getLOGValue(), t);
+ }
+ mastershipChangeListener.onNotAbleToStartMastership(
+ deviceInfo,
+ "Was not able to fill flow registry on device",
+ false);
+ }
+ }
+
}