Mastership service provider API
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / statistics / StatisticsContextImpl.java
index b7f37dbd3fb48215824a4f8d2824014daff1e572..d3b2e72cd9cb2be0878272f8cdf06e888a5a4275 100644 (file)
@@ -12,7 +12,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -21,7 +20,6 @@ import io.netty.util.Timeout;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import javax.annotation.Nonnull;
@@ -35,9 +33,8 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceState;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
-import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
 import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
 import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager;
@@ -66,20 +63,17 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     private final Object collectionStatTypeLock = new Object();
     private final ConvertorExecutor convertorExecutor;
     private final MultipartWriterProvider statisticsWriterProvider;
+    private final DeviceInfo deviceInfo;
+    private final StatisticsManager myManager;
     @GuardedBy("collectionStatTypeLock")
     private List<MultipartType> collectingStatType;
-
     private StatisticsGatheringService<T> statisticsGatheringService;
     private StatisticsGatheringOnTheFlyService<T> statisticsGatheringOnTheFlyService;
     private Timeout pollTimeout;
-    private final DeviceInfo deviceInfo;
-    private final StatisticsManager myManager;
+    private ContextChainMastershipWatcher contextChainMastershipWatcher;
 
+    private volatile ContextState state = ContextState.INITIALIZATION;
     private volatile boolean schedulingEnabled;
-    private volatile CONTEXT_STATE state;
-    private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
-    private ClusterInitializationPhaseHandler initialSubmitHandler;
-
     private volatile ListenableFuture<Boolean> lastDataGathering;
 
     StatisticsContextImpl(final boolean isStatisticsPollingOn,
@@ -96,7 +90,6 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
             deviceContext, convertorExecutor, statisticsWriterProvider);
         itemLifeCycleListener = new ItemLifecycleListenerImpl(deviceContext);
         statListForCollectingInitialization();
-        this.state = CONTEXT_STATE.INITIALIZATION;
         this.deviceInfo = deviceContext.getDeviceInfo();
         this.myManager = myManager;
         this.lastDataGathering = null;
@@ -151,7 +144,7 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
             lastDataGathering = collectingStatType.stream().reduce(
                     lastDataGathering,
                     this::statChainFuture,
-                    (a, b) -> Futures.transform(a, (AsyncFunction<Boolean, Boolean>) result -> b));
+                    (a, b) -> Futures.transformAsync(a, (AsyncFunction<Boolean, Boolean>) result -> b));
 
             // write end timestamp to state snapshot container
             Futures.addCallback(lastDataGathering, new FutureCallback<Boolean>() {
@@ -222,18 +215,16 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
 
     @Override
     public void close() {
-        if (CONTEXT_STATE.TERMINATION.equals(getState())) {
+        if (ContextState.TERMINATION.equals(state)) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("StatisticsContext for node {} is already in TERMINATION state.", getDeviceInfo().getLOGValue());
             }
         } else {
-            this.state = CONTEXT_STATE.TERMINATION;
+            this.state = ContextState.TERMINATION;
             stopGatheringData();
-
-            for (final Iterator<RequestContext<?>> iterator = Iterators.consumingIterator(requestContexts.iterator());
-                 iterator.hasNext(); ) {
-                RequestContextUtil.closeRequestContextWithRpcError(iterator.next(), CONNECTION_CLOSED);
-            }
+            requestContexts.forEach(requestContext -> RequestContextUtil
+                    .closeRequestContextWithRpcError(requestContext, CONNECTION_CLOSED));
+            requestContexts.clear();
         }
     }
 
@@ -253,8 +244,8 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     }
 
     private ListenableFuture<Boolean> statChainFuture(final ListenableFuture<Boolean> prevFuture, final MultipartType multipartType) {
-        return Futures.transform(deviceConnectionCheck(), (AsyncFunction<Boolean, Boolean>) connectionResult -> Futures
-                .transform(prevFuture, (AsyncFunction<Boolean, Boolean>) result -> {
+        return Futures.transformAsync(deviceConnectionCheck(), (AsyncFunction<Boolean, Boolean>) connectionResult -> Futures
+                .transformAsync(prevFuture, (AsyncFunction<Boolean, Boolean>) result -> {
                     LOG.debug("Status of previous stat iteration for node {}: {}", deviceInfo.getLOGValue(), result);
                     LOG.debug("Stats iterating to next type for node {} of type {}",
                             deviceInfo.getLOGValue(),
@@ -308,30 +299,23 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     }
 
     @Override
-    public CONTEXT_STATE getState() {
-        return this.state;
-    }
-
-    @Override
-    public ServiceGroupIdentifier getServiceIdentifier() {
-        return this.deviceInfo.getServiceIdentifier();
+    public DeviceInfo getDeviceInfo() {
+        return this.deviceInfo;
     }
 
     @Override
-    public DeviceInfo getDeviceInfo() {
-        return this.deviceInfo;
+    public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
+        this.contextChainMastershipWatcher = contextChainMastershipWatcher;
     }
 
     @Override
-    public ListenableFuture<Void> stopClusterServices() {
-        if (CONTEXT_STATE.TERMINATION.equals(this.state)) {
-            return Futures.immediateCancelledFuture();
-        }
+    public ListenableFuture<Void> closeServiceInstance() {
+        LOG.info("Stopping statistics context cluster services for node {}", deviceInfo.getLOGValue());
 
-        return Futures.transform(Futures.immediateFuture(null), new Function<Object, Void>() {
+        return Futures.transform(Futures.immediateFuture(null), new Function<Void, Void>() {
             @Nullable
             @Override
-            public Void apply(@Nullable Object input) {
+            public Void apply(@Nullable final Void input) {
                 schedulingEnabled = false;
                 stopGatheringData();
                 return null;
@@ -363,25 +347,20 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
     }
 
     @Override
-    public void setLifecycleInitializationPhaseHandler(final ClusterInitializationPhaseHandler handler) {
-        this.clusterInitializationPhaseHandler = handler;
-    }
-
-    @Override
-    public boolean onContextInstantiateService(final MastershipChangeListener mastershipChangeListener) {
+    public void instantiateServiceInstance() {
         LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
         this.statListForCollectingInitialization();
 
         Futures.addCallback(this.gatherDynamicData(), new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(@Nullable Boolean aBoolean) {
-                mastershipChangeListener.onMasterRoleAcquired(
+                contextChainMastershipWatcher.onMasterRoleAcquired(
                         deviceInfo,
                         ContextChainMastershipState.INITIAL_GATHERING
                 );
 
-                if (initialSubmitHandler.initialSubmitTransaction()) {
-                    mastershipChangeListener.onMasterRoleAcquired(
+                if (deviceContext.initialSubmitTransaction()) {
+                    contextChainMastershipWatcher.onMasterRoleAcquired(
                             deviceInfo,
                             ContextChainMastershipState.INITIAL_SUBMIT
                     );
@@ -390,7 +369,7 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
                         myManager.startScheduling(deviceInfo);
                     }
                 } else {
-                    mastershipChangeListener.onNotAbleToStartMastershipMandatory(
+                    contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
                             deviceInfo,
                             "Initial transaction cannot be submitted."
                     );
@@ -399,18 +378,17 @@ class StatisticsContextImpl<T extends OfHeader> implements StatisticsContext {
 
             @Override
             public void onFailure(@Nonnull Throwable throwable) {
-                mastershipChangeListener.onNotAbleToStartMastershipMandatory(
+                contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(
                         deviceInfo,
                         "Initial gathering statistics unsuccessful."
                 );
             }
         });
-
-        return this.clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener);
     }
 
+    @Nonnull
     @Override
-    public void setInitialSubmitHandler(final ClusterInitializationPhaseHandler initialSubmitHandler) {
-        this.initialSubmitHandler = initialSubmitHandler;
+    public ServiceGroupIdentifier getIdentifier() {
+        return deviceInfo.getServiceIdentifier();
     }
 }