package org.opendaylight.openflowplugin.impl.device;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.FutureCallback;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
-import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
import org.opendaylight.openflowplugin.api.ConnectionException;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
-import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
-import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
private boolean switchFeaturesMandatory;
private DeviceInfo deviceInfo;
private final ConvertorExecutor convertorExecutor;
- private volatile CONTEXT_STATE state;
+ private volatile ContextState state;
private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
private final DeviceManager myManager;
private final DeviceInitializerProvider deviceInitializerProvider;
private final boolean useSingleLayerSerialization;
- private OutboundQueueProvider outboundQueueProvider;
+ private boolean hasState;
private boolean isInitialTransactionSubmitted;
DeviceContextImpl(
final DeviceInitializerProvider deviceInitializerProvider) {
this.primaryConnectionContext = primaryConnectionContext;
- this.outboundQueueProvider = (OutboundQueueProvider) primaryConnectionContext.getOutboundQueueProvider();
this.deviceInfo = primaryConnectionContext.getDeviceInfo();
this.hashedWheelTimer = hashedWheelTimer;
this.deviceInitializerProvider = deviceInitializerProvider;
this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
- this.state = CONTEXT_STATE.INITIALIZATION;
+ this.state = ContextState.INITIALIZATION;
this.convertorExecutor = convertorExecutor;
this.skipTableFeatures = skipTableFeatures;
this.useSingleLayerSerialization = useSingleLayerSerialization;
@Override
public boolean initialSubmitTransaction() {
- if (initialized) {
- isInitialTransactionSubmitted = true;
- return transactionChainManager.initialSubmitWriteTransaction();
- }
-
- return false;
+ return (initialized &&(isInitialTransactionSubmitted =
+ transactionChainManager.initialSubmitWriteTransaction()));
}
@Override
return dataBroker.newReadOnlyTransaction();
}
+ @Override
+ public boolean isTransactionsEnabled() {
+ return isInitialTransactionSubmitted;
+ }
+
@Override
public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
final InstanceIdentifier<T> path,
messageSpy.spyMessage(
ofHeader.getImplementedInterface(),
(ofHeader instanceof Error)
- ? MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE
- : MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+ ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
+ : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
}
@Override
ofHeaderList.forEach(header -> messageSpy.spyMessage(
header.getImplementedInterface(),
(header instanceof Error)
- ? MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE
- : MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS));
+ ? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
+ : MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
}
@Override
@Override
public void processPortStatusMessage(final PortStatusMessage portStatus) {
- messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+ messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
- if (isInitialTransactionSubmitted) {
+ if (initialized) {
try {
writePortStatusMessage(portStatus);
submitTransaction();
LOG.warn("Error processing port status message for port {} on device {}",
portStatus.getPortNo(), getDeviceInfo().getLOGValue(), e);
}
+ } else if (!hasState) {
+ primaryConnectionContext.handlePortStatusMessage(portStatus);
}
}
@Override
public void processPacketInMessage(final PacketInMessage packetInMessage) {
- messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH);
+ messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH);
final ConnectionAdapter connectionAdapter = getPrimaryConnectionContext().getConnectionAdapter();
final PacketReceived packetReceived = packetInTranslator.translate(packetInMessage, getDeviceInfo(), null);
if (packetReceived == null) {
LOG.debug("Received a null packet from switch {}", connectionAdapter.getRemoteAddress());
- messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
+ messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
return;
} else {
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
}
if (!packetInLimiter.acquirePermit()) {
LOG.debug("Packet limited");
// TODO: save packet into emergency slot if possible
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PACKET_IN_LIMIT_REACHED_AND_DROPPED);
return;
}
final ListenableFuture<?> offerNotification = notificationPublishService.offerNotification(packetReceived);
if (NotificationPublishService.REJECTED.equals(offerNotification)) {
LOG.debug("notification offer rejected");
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
packetInLimiter.drainLowWaterMark();
packetInLimiter.releasePermit();
return;
Futures.addCallback(offerNotification, new FutureCallback<Object>() {
@Override
public void onSuccess(final Object result) {
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
packetInLimiter.releasePermit();
}
@Override
public void onFailure(final Throwable t) {
- messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_NOTIFICATION_REJECTED);
+ messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.StatisticsGroup.FROM_SWITCH_NOTIFICATION_REJECTED);
LOG.debug("notification offer failed: {}", t.getMessage());
LOG.trace("notification offer failed..", t);
packetInLimiter.releasePermit();
@Override
public void onPublished() {
- Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState()));
- this.state = CONTEXT_STATE.WORKING;
+ Verify.verify(ContextState.INITIALIZATION.equals(getState()));
+ this.state = ContextState.WORKING;
primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Shutdown method for node {}", getDeviceInfo().getLOGValue());
}
- if (CONTEXT_STATE.TERMINATION.equals(getState())) {
+ if (ContextState.TERMINATION.equals(getState())) {
LOG.debug("DeviceCtx for Node {} is in termination process.", getDeviceInfo().getLOGValue());
return;
}
}
@Override
- public CONTEXT_STATE getState() {
+ public ContextState getState() {
return this.state;
}
//NOOP
}
- @Override
- public void putLifecycleServiceIntoTxChainManager(final LifecycleService lifecycleService){
- if (initialized) {
- this.transactionChainManager.setLifecycleService(lifecycleService);
- }
- }
-
@Override
public boolean canUseSingleLayerSerialization() {
return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
lazyTransactionManagerInitialization();
- this.transactionChainManager.activateTransactionManager();
+
+ try {
+ final List<PortStatusMessage> portStatusMessages = primaryConnectionContext
+ .retrieveAndClearPortStatusMessages();
+
+ portStatusMessages.forEach(this::writePortStatusMessage);
+ submitTransaction();
+ } catch (final Exception ex) {
+ LOG.warn("Error processing port status messages from device {}", getDeviceInfo().getLOGValue(), ex);
+ return false;
+ }
try {
final java.util.Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo.getNodeInstanceIdentifier());
this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
+ this.transactionChainManager.activateTransactionManager();
this.initialized = true;
}
}
return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
}
+ @Override
+ public void onStateAcquired(final ContextChainState state) {
+ hasState = true;
+ }
+
private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
private final MastershipChangeListener mastershipChangeListener;