import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.opendaylight.controller.md.sal.binding.api.NotificationService;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
+import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
+import org.opendaylight.openflowplugin.impl.util.DeviceInitializationUtils;
import org.opendaylight.openflowplugin.impl.util.MdSalRegistratorUtils;
import org.opendaylight.openflowplugin.openflow.md.core.session.SwitchConnectionCookieOFImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
private RpcContext rpcContext;
private ExtensionConverterProvider extensionConverterProvider;
+ private final boolean switchFeaturesMandatory;
+ private StatisticsContext statCtx;
+
@VisibleForTesting
DeviceContextImpl(@Nonnull final ConnectionContext primaryConnectionContext,
@Nonnull final HashedWheelTimer hashedWheelTimer,
@Nonnull final MessageSpy _messageSpy,
@Nonnull final OutboundQueueProvider outboundQueueProvider,
- @Nonnull final TranslatorLibrary translatorLibrary) {
+ @Nonnull final TranslatorLibrary translatorLibrary,
+ final boolean switchFeaturesMandatory) {
+ this.switchFeaturesMandatory = switchFeaturesMandatory;
this.primaryConnectionContext = Preconditions.checkNotNull(primaryConnectionContext);
this.deviceState = Preconditions.checkNotNull(deviceState);
this.dataBroker = Preconditions.checkNotNull(dataBroker);
}
@Override
- public Long getReservedXid() {
+ public Long reservedXidForDeviceMessage() {
return outboundQueueProvider.reserveEntry();
}
}
@Override
- public ListenableFuture<Void> onClusterRoleChange(@CheckForNull final OfpRole role) {
- LOG.debug("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
+ public ListenableFuture<Void> onClusterRoleChange(final OfpRole oldRole, @CheckForNull final OfpRole role) {
+ LOG.trace("onClusterRoleChange {} for node:", role, deviceState.getNodeId());
Preconditions.checkArgument(role != null);
- if (rpcContext != null) {
- // TODO : implement interface for onClusterRoleChange method
- MdSalRegistratorUtils.registerServices(rpcContext, this, role);
+ if (role.equals(oldRole)) {
+ LOG.debug("Demanded role change for device {} is not change OldRole: {}, NewRole {}", deviceState.getNodeId(), oldRole, role);
+ return Futures.immediateFuture(null);
}
-
- final ListenableFuture<Void> nextStepFuture;
if (OfpRole.BECOMEMASTER.equals(role)) {
- transactionChainManager.activateTransactionManager();
- nextStepFuture = Futures.immediateCheckedFuture(null);
- getDeviceState().setRole(role);
+ if (!deviceState.deviceSynchronized()) {
+ LOG.debug("Setup Device Ctx {} for Master Role", getDeviceState().getNodeId());
+ transactionChainManager.activateTransactionManager();
+ return Futures.immediateCheckedFuture(null);
+ }
+ /* Relevant for no initial Slave-to-Master scenario in cluster */
+ return asyncClusterRoleChange(role);
} else if (OfpRole.BECOMESLAVE.equals(role)) {
- nextStepFuture = transactionChainManager.deactivateTransactionManager();
- Futures.transform(nextStepFuture, new Function<Void, Void>() {
- @Nullable
- @Override
- public Void apply(@Nullable final Void aVoid) {
- getDeviceState().setRole(role);
- return null;
- }
- });
+ if (rpcContext != null) {
+ MdSalRegistratorUtils.registerSlaveServices(rpcContext, role);
+ }
+ return transactionChainManager.deactivateTransactionManager();
} else {
- LOG.warn("Unknow OFCluster Role {} for Node {}", role, deviceState.getNodeId());
- nextStepFuture = Futures.immediateCheckedFuture(null);
+ LOG.warn("Unknown OFCluster Role {} for Node {}", role, deviceState.getNodeId());
+ if (rpcContext != null) {
+ MdSalRegistratorUtils.unregisterServices(rpcContext);
+ }
+ return transactionChainManager.deactivateTransactionManager();
+ }
+ }
+
+ /*
+ * we don't have active TxManager so anything will not be stored to DS yet, but we have
+ * check all NodeInformation for statistics otherwise statistics will not contains
+ * all possible MultipartTypes for polling in StatTypeList
+ */
+ private ListenableFuture<Void> asyncClusterRoleChange(final OfpRole role) {
+ if (statCtx == null) {
+ final String errMsg = String.format("DeviceCtx {} is up but we are missing StatisticsContext", deviceState.getNodeId());
+ LOG.warn(errMsg);
+ return Futures.immediateFailedFuture(new IllegalStateException(errMsg));
}
- return nextStepFuture;
+ final InstanceIdentifier<FlowCapableNode> ofNodeII = deviceState.getNodeInstanceIdentifier()
+ .augmentation(FlowCapableNode.class);
+ final ReadOnlyTransaction readTx = getReadTransaction();
+ final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> readOfNodeFuture = readTx.read(
+ LogicalDatastoreType.OPERATIONAL, ofNodeII);
+
+ final ListenableFuture<Void> nodeInitInfoFuture = Futures.transform(readOfNodeFuture,
+ new AsyncFunction<Optional<FlowCapableNode>, Void>() {
+ @Override
+ public ListenableFuture<Void> apply(final Optional<FlowCapableNode> input) throws Exception {
+ if (!input.isPresent() || input.get().getTable().isEmpty()) {
+ /* Last master close fail scenario so we would like to activate TxManager */
+ LOG.debug("Operational DS for Device {} has to be replaced", deviceState.getNodeId());
+ getDeviceState().setDeviceSynchronized(false);
+ transactionChainManager.activateTransactionManager();
+ }
+ return DeviceInitializationUtils.initializeNodeInformation(DeviceContextImpl.this, switchFeaturesMandatory);
+ }
+ });
+
+ final ListenableFuture<Boolean> statPollFuture = Futures.transform(nodeInitInfoFuture,
+ new AsyncFunction<Void, Boolean>() {
+
+ @Override
+ public ListenableFuture<Boolean> apply(final Void input) throws Exception {
+ getStatisticsContext().statListForCollectingInitialization();
+ if (getDeviceState().deviceSynchronized()) {
+ return Futures.immediateFuture(Boolean.TRUE);
+ }
+ return getStatisticsContext().gatherDynamicData();
+ }
+ });
+
+ return Futures.transform(statPollFuture, new Function<Boolean, Void>() {
+
+ @Override
+ public Void apply(final Boolean input) {
+ if (!input.booleanValue()) {
+ LOG.warn("Get Initial Device {} information fails", getDeviceState().getNodeId());
+ DeviceContextImpl.this.close();
+ return null;
+ }
+ LOG.debug("Get Initial Device {} information is successful", getDeviceState().getNodeId());
+ if (null != getRpcContext()) {
+ MdSalRegistratorUtils.registerMasterServices(getRpcContext(), DeviceContextImpl.this, role);
+ }
+ getDeviceState().setDeviceSynchronized(true);
+ getDeviceState().setStatisticsPollingEnabledProp(true);
+ transactionChainManager.activateTransactionManager();
+ return null;
+ }
+ });
}
@Override
@Override
public void onSuccess(final Void result) {
- LOG.info("Delete Node {} was successfull.", deviceState.getNodeId());
+ LOG.info("TxChain {} was shutdown successfull.", deviceState.getNodeId());
tearDownClean();
}
@Override
public void onFailure(final Throwable t) {
- LOG.warn("Delete Node {} fail.", deviceState.getNodeId(), t);
+ LOG.warn("Shutdown TxChain {} fail.", deviceState.getNodeId(), t);
tearDownClean();
}
});
public ExtensionConverterProvider getExtensionConverterProvider() {
return extensionConverterProvider;
}
+
+ @Override
+ public void setStatisticsContext(final StatisticsContext statisticsContext) {
+ this.statCtx = statisticsContext;
+ }
+
+ @Override
+ public StatisticsContext getStatisticsContext() {
+ return statCtx;
+ }
}