import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
private final Object collectionStatTypeLock = new Object();
private final ConvertorExecutor convertorExecutor;
private final MultipartWriterProvider statisticsWriterProvider;
+ private final DeviceInfo deviceInfo;
+ private final StatisticsManager myManager;
@GuardedBy("collectionStatTypeLock")
private List<MultipartType> collectingStatType;
-
private StatisticsGatheringService<T> statisticsGatheringService;
private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
private Timeout pollTimeout;
- private final DeviceInfo deviceInfo;
- private final StatisticsManager myManager;
+ private ContextChainMastershipWatcher contextChainMastershipWatcher;
+ private volatile ContextState state = ContextState.INITIALIZATION;
private volatile boolean schedulingEnabled;
- private volatile CONTEXT_STATE state;
- private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
- private ClusterInitializationPhaseHandler initialSubmitHandler;
-
private volatile ListenableFuture<Boolean> lastDataGathering;
StatisticsContextImpl(final boolean isStatisticsPollingOn,
deviceContext, convertorExecutor, statisticsWriterProvider);
itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
statListForCollectingInitialization();
- this.state = CONTEXT_STATE.INITIALIZATION;
this.deviceInfo = deviceContext.getDeviceInfo();
this.myManager = myManager;
this.lastDataGathering = null;
lastDataGathering = collectingStatType.stream().reduce(
lastDataGathering,
this::statChainFuture,
- (a, b) -> Futures.transform(a, (AsyncFunction<Boolean, Boolean>) result -> b));
+ (a, b) -> Futures.transformAsync(a, (AsyncFunction<Boolean, Boolean>) result -> b));
// write end timestamp to state snapshot container
Futures.addCallback(lastDataGathering, new FutureCallback<Boolean>() {
@Override
public void close() {
- if (CONTEXT_STATE.TERMINATION.equals(getState())) {
+ if (ContextState.TERMINATION.equals(state)) {
if (LOG.isDebugEnabled()) {
LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
}
} else {
- this.state = CONTEXT_STATE.TERMINATION;
+ this.state = ContextState.TERMINATION;
stopGatheringData();
-
- for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
- iterator.hasNext(); ) {
- RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
- }
+ requestContexts.forEach(requestContext -> RequestContextUtil
+ .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
+ requestContexts.clear();
}
}
}
private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture, final MultipartType multipartType) {
- return Futures.transform(deviceConnectionCheck(), (AsyncFunction<Boolean, Boolean>) connectionResult -> Futures
- .transform(prevFuture, (AsyncFunction<Boolean, Boolean>) result -> {
+ return Futures.transformAsync(deviceConnectionCheck(), (AsyncFunction<Boolean, Boolean>) connectionResult -> Futures
+ .transformAsync(prevFuture, (AsyncFunction<Boolean, Boolean>) result -> {
LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo.getLOGValue(), result);
LOG.debug("Stats iterating to next type for node {} of type {}",
deviceInfo.getLOGValue(),
}
@Override
- public CONTEXT_STATE getState() {
- return this.state;
- }
-
- @Override
- public ServiceGroupIdentifier getServiceIdentifier() {
- return this.deviceInfo.getServiceIdentifier();
+ public DeviceInfo getDeviceInfo() {
+ return this.deviceInfo;
}
@Override
- public DeviceInfo getDeviceInfo() {
- return this.deviceInfo;
+ public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
+ this.contextChainMastershipWatcher = contextChainMastershipWatcher;
}
@Override
- public ListenableFuture<Void> stopClusterServices() {
- if (CONTEXT_STATE.TERMINATION.equals(this.state)) {
- return Futures.immediateCancelledFuture();
- }
+ public ListenableFuture<Void> closeServiceInstance() {
+ LOG.info("Stopping statistics context cluster services for node {}", deviceInfo.getLOGValue());
- return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
+ return Futures.transform(Futures.immediateFuture(null), new Function<Void, Void>() {
@Nullable
@Override
- public Void apply(@Nullable Object input) {
+ public Void apply(@Nullable final Void input) {
schedulingEnabled = false;
stopGatheringData();
return null;
}
@Override
- public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
- this.clusterInitializationPhaseHandler = handler;
- }
-
- @Override
- public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
+ public void instantiateServiceInstance() {
LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
this.statListForCollectingInitialization();
Futures.addCallback(this.gatherDynamicData(), new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean aBoolean) {
- mastershipChangeListener.onMasterRoleAcquired(
+ contextChainMastershipWatcher.onMasterRoleAcquired(
deviceInfo,
ContextChainMastershipState.INITIAL_GATHERING
);
- if (initialSubmitHandler.initialSubmitTransaction()) {
- mastershipChangeListener.onMasterRoleAcquired(
+ if (deviceContext.initialSubmitTransaction()) {
+ contextChainMastershipWatcher.onMasterRoleAcquired(
deviceInfo,
ContextChainMastershipState.INITIAL_SUBMIT
);
myManager.startScheduling(deviceInfo);
}
} else {
- mastershipChangeListener.onNotAbleToStartMastershipMandatory(
+ contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
deviceInfo,
"Initial transaction cannot be submitted."
);
@Override
public void onFailure(@Nonnull Throwable throwable) {
- mastershipChangeListener.onNotAbleToStartMastershipMandatory(
+ contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
deviceInfo,
"Initial gathering statistics unsuccessful."
);
}
});
-
- return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
}
+ @Nonnull
@Override
- public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
- this.initialSubmitHandler = initialSubmitHandler;
+ public ServiceGroupIdentifier getIdentifier() {
+ return deviceInfo.getServiceIdentifier();
}
}