import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.AsyncFunction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowjava.protocol.api.keys.MessageTypeKey;
import org.opendaylight.openflowplugin.api.ConnectionException;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.MultiMsgCollector;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
+import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
+import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
+import org.opendaylight.openflowplugin.impl.services.sal.SalRoleServiceImpl;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
import org.opendaylight.openflowplugin.openflow.md.util.InventoryDataServiceUtil;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRemovedBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeUpdatedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnectorKey;
// Timeout in seconds after what we will give up on propagating role
private static final int SET_ROLE_TIMEOUT = 10;
+ private static final int LOW_WATERMARK = 1000;
+ private static final int HIGH_WATERMARK = 2000;
+
private boolean initialized;
private SalRoleService salRoleService = null;
private final HashedWheelTimer hashedWheelTimer;
- private ConnectionContext primaryConnectionContext;
+ private volatile ConnectionContext primaryConnectionContext;
private final DeviceState deviceState;
private final DataBroker dataBroker;
private final Map<SwitchConnectionDistinguisher, ConnectionContext> auxiliaryConnectionContexts;
private DeviceFlowRegistry deviceFlowRegistry;
private DeviceGroupRegistry deviceGroupRegistry;
private DeviceMeterRegistry deviceMeterRegistry;
- private final PacketInRateLimiter packetInLimiter;
+ private PacketInRateLimiter packetInLimiter;
private final MessageSpy messageSpy;
private final ItemLifeCycleKeeper flowLifeCycleKeeper;
private NotificationPublishService notificationPublishService;
private final TranslatorLibrary translatorLibrary;
private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
private ExtensionConverterProvider extensionConverterProvider;
- private final DeviceManager deviceManager;
private boolean skipTableFeatures;
private boolean switchFeaturesMandatory;
- private final DeviceInfo deviceInfo;
+ private DeviceInfo deviceInfo;
private final ConvertorExecutor convertorExecutor;
private volatile CONTEXT_STATE state;
private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
private final DeviceManager myManager;
private final DeviceInitializerProvider deviceInitializerProvider;
private final boolean useSingleLayerSerialization;
+ private Boolean isAddNotificationSent = false;
+ private OutboundQueueProvider outboundQueueProvider;
+ private boolean wasOnceMaster;
DeviceContextImpl(
@Nonnull final ConnectionContext primaryConnectionContext,
@Nonnull final DataBroker dataBroker,
@Nonnull final MessageSpy messageSpy,
@Nonnull final TranslatorLibrary translatorLibrary,
- @Nonnull final DeviceManager manager,
+ @Nonnull final DeviceManager contextManager,
final ConvertorExecutor convertorExecutor,
final boolean skipTableFeatures,
final HashedWheelTimer hashedWheelTimer,
- final DeviceManager myManager,
final boolean useSingleLayerSerialization,
final DeviceInitializerProvider deviceInitializerProvider) {
this.primaryConnectionContext = primaryConnectionContext;
+ this.outboundQueueProvider = (OutboundQueueProvider) primaryConnectionContext.getOutboundQueueProvider();
this.deviceInfo = primaryConnectionContext.getDeviceInfo();
this.hashedWheelTimer = hashedWheelTimer;
- this.myManager = myManager;
this.deviceInitializerProvider = deviceInitializerProvider;
+ this.myManager = contextManager;
this.deviceState = new DeviceStateImpl();
this.dataBroker = dataBroker;
this.auxiliaryConnectionContexts = new HashMap<>();
this.messageSpy = Preconditions.checkNotNull(messageSpy);
- this.deviceManager = manager;
this.packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
- /*initial*/ 1000, /*initial*/2000, this.messageSpy, REJECTED_DRAIN_FACTOR);
+ /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, this.messageSpy, REJECTED_DRAIN_FACTOR);
this.translatorLibrary = translatorLibrary;
this.portStatusTranslator = translatorLibrary.lookupTranslator(
this.skipTableFeatures = skipTableFeatures;
this.useSingleLayerSerialization = useSingleLayerSerialization;
this.initialized = false;
+ this.wasOnceMaster = false;
}
@Override
final org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowRemoved flowRemovedNotification =
flowRemovedTranslator.translate(flowRemoved, deviceInfo, null);
- if(deviceManager.isFlowRemovedNotificationOn()) {
+ if(!myManager.isFlowRemovedNotificationOn()) {
// Trigger off a notification
notificationPublishService.offerNotification(flowRemovedNotification);
} else if(LOG.isDebugEnabled()) {
- LOG.debug("For nodeId={} isFlowRemovedNotificationOn={}", getDeviceInfo().getLOGValue(), deviceManager.isFlowRemovedNotificationOn());
+ LOG.debug("For nodeId={} isNotificationFlowRemovedOn={}", getDeviceInfo().getLOGValue(), myManager.isFlowRemovedNotificationOn());
}
final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
//2. create registry key
final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(), flowRemovedNotification);
//3. lookup flowId
- final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveIdForFlow(flowRegKey);
+ final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
//4. if flowId present:
if (flowDescriptor != null) {
// a) construct flow path
}
}
+ @Override
+ public void sendNodeAddedNotification() {
+ if (!isAddNotificationSent) {
+ isAddNotificationSent = true;
+ NodeUpdatedBuilder builder = new NodeUpdatedBuilder();
+ builder.setId(getDeviceInfo().getNodeId());
+ builder.setNodeRef(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()));
+ LOG.debug("Publishing node added notification for {}", builder.build());
+ notificationPublishService.offerNotification(builder.build());
+ }
+ }
+
+ @Override
+ public void sendNodeRemovedNotification() {
+ NodeRemovedBuilder builder = new NodeRemovedBuilder();
+ builder.setNodeRef(new NodeRef(getDeviceInfo().getNodeInstanceIdentifier()));
+ LOG.debug("Publishing node removed notification for {}", builder.build());
+ notificationPublishService.offerNotification(builder.build());
+ }
+
@Override
public void processPortStatusMessage(final PortStatusMessage portStatus) {
messageSpy.spyMessage(portStatus.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
public void onPublished() {
Verify.verify(CONTEXT_STATE.INITIALIZATION.equals(getState()));
this.state = CONTEXT_STATE.WORKING;
- primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
+ synchronized (primaryConnectionContext) {
+ primaryConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
+ }
for (final ConnectionContext switchAuxConnectionContext : auxiliaryConnectionContexts.values()) {
switchAuxConnectionContext.getConnectionAdapter().setPacketInFiltering(false);
}
this.switchFeaturesMandatory = switchFeaturesMandatory;
}
+ @Override
+ public synchronized void replaceConnection(final ConnectionContext connectionContext) {
+
+ primaryConnectionContext = null;
+ deviceInfo = null;
+ packetInLimiter = null;
+
+ primaryConnectionContext = connectionContext;
+ deviceInfo = primaryConnectionContext.getDeviceInfo();
+
+ packetInLimiter = new PacketInRateLimiter(primaryConnectionContext.getConnectionAdapter(),
+ /*initial*/ LOW_WATERMARK, /*initial*/HIGH_WATERMARK, messageSpy, REJECTED_DRAIN_FACTOR);
+
+ primaryConnectionContext.setOutboundQueueProvider(outboundQueueProvider);
+
+ final OutboundQueueHandlerRegistration<OutboundQueueProvider> outboundQueueHandlerRegistration =
+ primaryConnectionContext.getConnectionAdapter().registerOutboundQueueHandler(
+ outboundQueueProvider,
+ myManager.getBarrierCountLimit(),
+ myManager.getBarrierIntervalNanos());
+
+ primaryConnectionContext.setOutboundQueueHandleRegistration(outboundQueueHandlerRegistration);
+
+ final OpenflowProtocolListenerFullImpl messageListener = new OpenflowProtocolListenerFullImpl(
+ primaryConnectionContext.getConnectionAdapter(), this);
+
+ primaryConnectionContext.getConnectionAdapter().setMessageListener(messageListener);
+
+ LOG.info("ConnectionEvent: Connection on device:{}, NodeId:{} switched.",
+ primaryConnectionContext.getConnectionAdapter().getRemoteAddress(),
+ primaryConnectionContext.getDeviceInfo().getNodeId());
+
+ }
+
@Override
public CONTEXT_STATE getState() {
return this.state;
: Futures.immediateFuture(null);
if (!connectionInterrupted) {
- final ListenableFuture<Void> makeSlaveFuture = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
+ final ListenableFuture<Void> makeSlaveFuture
+ = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
@Nullable
@Override
public Void apply(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
if (LOG.isDebugEnabled()) {
LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getLOGValue());
}
+ sendNodeAddedNotification();
}
@Override
}
});
- return Futures.transform(deactivateTxManagerFuture, new AsyncFunction<Void, Void>() {
- @Override
- public ListenableFuture<Void> apply(Void aVoid) throws Exception {
- // Add fallback to remove device from operational DS if setting slave fails
- return Futures.withFallback(makeSlaveFuture, t ->
- myManager.removeDeviceFromOperationalDS(deviceInfo));
- }
- });
- } else {
- return Futures.transform(deactivateTxManagerFuture, new AsyncFunction<Void, Void>() {
- @Override
- public ListenableFuture<Void> apply(Void aVoid) throws Exception {
- return myManager.removeDeviceFromOperationalDS(deviceInfo);
- }
- });
}
+
+ return deactivateTxManagerFuture;
}
@Override
} else {
this.state = CONTEXT_STATE.TERMINATION;
}
+ sendNodeRemovedNotification();
}
@Override
}
}
- @Override
- public void replaceConnectionContext(final ConnectionContext connectionContext){
- // Act like we are initializing the context
- this.state = CONTEXT_STATE.INITIALIZATION;
- this.primaryConnectionContext = connectionContext;
- this.onPublished();
- }
-
@Override
public boolean canUseSingleLayerSerialization() {
return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
}
@Override
- public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
+ public void masterSuccessful(){
+ this.wasOnceMaster = true;
+ }
- if (getPrimaryConnectionContext().getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
- LOG.warn("Connection on device {} was interrupted, will stop starting master services.", deviceInfo.getLOGValue());
- return false;
- }
+ @Override
+ public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
LOG.info("Starting device context cluster services for node {}", deviceInfo.getLOGValue());
this.transactionChainManager.activateTransactionManager();
try {
- final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
+ final java.util.Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
.lookup(deviceInfo.getVersion());
if (initializer.isPresent()) {
return false;
}
- Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new RpcResultFutureCallback());
- return this.clusterInitializationPhaseHandler.onContextInstantiateService(getPrimaryConnectionContext());
+ Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER),
+ new RpcResultFutureCallback(mastershipChangeListener));
+
+ final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = getDeviceFlowRegistry().fill();
+ Futures.addCallback(deviceFlowRegistryFill, new DeviceFlowRegistryCallback(deviceFlowRegistryFill));
+
+ return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
}
@VisibleForTesting
}
private class RpcResultFutureCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
+
+ private final MastershipChangeListener mastershipChangeListener;
+
+ RpcResultFutureCallback(final MastershipChangeListener mastershipChangeListener) {
+ this.mastershipChangeListener = mastershipChangeListener;
+ }
+
@Override
public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
if (LOG.isDebugEnabled()) {
LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getLOGValue());
}
+ sendNodeAddedNotification();
}
@Override
public void onFailure(final Throwable throwable) {
LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getLOGValue());
- shutdownConnection();
+ mastershipChangeListener.onNotAbleToStartMastership(deviceInfo);
+ }
+ }
+
+ private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
+ private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
+
+ DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
+ this.deviceFlowRegistryFill = deviceFlowRegistryFill;
+ }
+
+ @Override
+ public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Finished filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ if (deviceFlowRegistryFill.isCancelled()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
+ }
+ } else {
+ LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getLOGValue(), t);
+ }
}
}
+
}