Fix context chain closing
[openflowplugin.git] / openflowplugin-impl / src / main / java / org / opendaylight / openflowplugin / impl / lifecycle / LifecycleServiceImpl.java
index 729a3d8718defb60a2a01985ed0e8c3da2766b9a..d7824e9a0953bb9ba294c63dfcd69809d208ea6e 100644 (file)
@@ -1,5 +1,5 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+/**
+ * Copyright (c) 2016 Pantheon Technologies s.r.o. and others. All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -7,25 +7,29 @@
  */
 package org.opendaylight.openflowplugin.impl.lifecycle;
 
-import com.google.common.base.Optional;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Verify;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
 import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
-import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.OFPContext.ContextState;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
 import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
-import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
-import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
-import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.MastershipChangeListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,189 +37,111 @@ public class LifecycleServiceImpl implements LifecycleService {
 
     private static final Logger LOG = LoggerFactory.getLogger(LifecycleServiceImpl.class);
 
-    private DeviceContext deviceContext;
-    private RpcContext rpcContext;
-    private RoleContext roleContext;
-    private StatisticsContext statContext;
+    private final List<DeviceRemovedHandler> deviceRemovedHandlers = new ArrayList<>();
+    private final MastershipChangeListener mastershipChangeListener;
+    private final ExecutorService executorService;
     private ClusterSingletonServiceRegistration registration;
-
+    private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
+    private ServiceGroupIdentifier serviceGroupIdentifier;
+    private DeviceInfo deviceInfo;
+    private volatile ContextState state = ContextState.INITIALIZATION;
+
+    public LifecycleServiceImpl(@Nonnull final MastershipChangeListener mastershipChangeListener,
+                                @Nonnull final ExecutorService executorService) {
+        this.mastershipChangeListener = mastershipChangeListener;
+        this.executorService = executorService;
+    }
 
     @Override
-    public void instantiateServiceInstance() {
-        try {
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("========== Starting clustering MASTER services for node {} ==========", this.deviceContext.getDeviceInfo().getLOGValue());
-            }
-
-            if (connectionInterrupted()) {
-                return;
-            }
-
-            LOG.info("Starting device context cluster services for node {}", getIdentifier());
-            this.deviceContext.startupClusterServices();
+    public void makeDeviceSlave(final DeviceContext deviceContext) {
+        deviceInfo = MoreObjects.firstNonNull(deviceInfo, deviceContext.getDeviceInfo());
 
-            if (connectionInterrupted()) {
-                return;
-            }
-
-            LOG.info("Starting statistics context cluster services for node {}", getIdentifier());
-            this.statContext.startupClusterServices();
-
-            if (connectionInterrupted()) {
-                return;
-            }
-
-            LOG.info("Statistics initial gathering OK, submitting data for node {}", getIdentifier());
-            this.deviceContext.initialSubmitTransaction();
-
-            if (connectionInterrupted()) {
-                return;
+        Futures.addCallback(deviceContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
+            @Override
+            public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Role SLAVE was successfully propagated on device, node {}",
+                            deviceContext.getDeviceInfo().getLOGValue());
+                }
+                mastershipChangeListener.onSlaveRoleAcquired(deviceInfo);
             }
 
-            LOG.info("Starting rpc context cluster services for node {}", getIdentifier());
-            this.rpcContext.startupClusterServices();
-
-            if (connectionInterrupted()) {
-                return;
+            @Override
+            public void onFailure(@Nonnull Throwable throwable) {
+                LOG.warn("Was not able to set role SLAVE to device on node {} ",
+                        deviceContext.getDeviceInfo().getLOGValue());
+                mastershipChangeListener.onSlaveRoleNotAcquired(deviceInfo);
             }
+        });
+    }
 
-            LOG.info("Starting role context cluster services for node {}", getIdentifier());
-            this.roleContext.startupClusterServices();
+    @Override
+    public void instantiateServiceInstance() {
+        executorService.submit(() -> {
+            LOG.info("Starting clustering services for node {}", deviceInfo.getLOGValue());
 
-            if (connectionInterrupted()) {
-                return;
+            if (!clusterInitializationPhaseHandler.onContextInstantiateService(mastershipChangeListener)) {
+                mastershipChangeListener.onNotAbleToStartMastershipMandatory(deviceInfo, "Cannot initialize device.");
             }
-
-            LOG.info("Caching flows IDs ...");
-            fillDeviceFlowRegistry();
-
-        } catch (ExecutionException | InterruptedException e) {
-            LOG.warn("Cluster service {} was unable to start.", this.getIdentifier());
-            this.deviceContext.shutdownConnection();
-        }
-    }
-
-    private boolean connectionInterrupted() {
-        if (this.deviceContext.getPrimaryConnectionContext().getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
-            LOG.warn("Node {} was disconnected, will stop starting MASTER services.", this.deviceContext.getDeviceInfo().getLOGValue());
-            return true;
-        }
-        return false;
+        });
     }
 
     @Override
     public ListenableFuture<Void> closeServiceInstance() {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("========== Stopping clustering MASTER services for node {} ==========", this.deviceContext.getDeviceInfo().getLOGValue());
-        }
-
-        final boolean connectionInterrupted =
-                this.deviceContext
-                        .getPrimaryConnectionContext()
-                        .getConnectionState()
-                        .equals(ConnectionContext.CONNECTION_STATE.RIP);
-
-        LOG.info("Stopping role context cluster services for node {}", getIdentifier());
-        roleContext.stopClusterServices(connectionInterrupted);
-
-        LOG.info("Stopping statistics context cluster services for node {}", getIdentifier());
-        statContext.stopClusterServices(connectionInterrupted);
-
-        LOG.info("Stopping rpc context cluster services for node {}", getIdentifier());
-        rpcContext.stopClusterServices(connectionInterrupted);
-
-        LOG.info("Stopping device context cluster services for node {}", getIdentifier());
-        return deviceContext.stopClusterServices(connectionInterrupted);
+        LOG.info("Closing clustering services for node {}", deviceInfo.getLOGValue());
+        return Futures.immediateFuture(null);
     }
 
     @Override
+    @Nonnull
     public ServiceGroupIdentifier getIdentifier() {
-        return deviceContext.getServiceIdentifier();
+        return this.serviceGroupIdentifier;
     }
 
-
     @Override
-    public void close() throws Exception {
-        if (registration != null) {
-            registration.close();
-            registration = null;
+    public void close() {
+        if (ContextState.TERMINATION.equals(state)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("LifecycleService for node {} is already in TERMINATION state.", deviceInfo.getLOGValue());
+            }
+        } else {
+            state = ContextState.TERMINATION;
+
+            // We are closing, so cleanup all managers now
+            deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(deviceInfo));
+
+            // If we are still registered and we are not already closing, then close the registration
+            if (Objects.nonNull(registration)) {
+                try {
+                    LOG.info("Closing clustering services registration for node {}", deviceInfo.getLOGValue());
+                    registration.close();
+                    registration = null;
+                } catch (final Exception e) {
+                    LOG.warn("Failed to close clustering services registration for node {} with exception: ",
+                            deviceInfo.getLOGValue(), e);
+                }
+            }
         }
     }
 
     @Override
-    public void registerService(final ClusterSingletonServiceProvider singletonServiceProvider) {
-        this.registration = singletonServiceProvider.registerClusterSingletonService(this);
-    }
-
-    @Override
-    public void setDeviceContext(final DeviceContext deviceContext) {
-        this.deviceContext = deviceContext;
+    public void registerService(@Nonnull final ClusterSingletonServiceProvider singletonServiceProvider,
+                                @Nonnull final DeviceContext deviceContext) {
+        Verify.verify(Objects.isNull(registration));
+        this.clusterInitializationPhaseHandler = deviceContext;
+        this.serviceGroupIdentifier = deviceContext.getServiceIdentifier();
+        this.deviceInfo = deviceContext.getDeviceInfo();
+        this.registration = Verify.verifyNotNull(
+                singletonServiceProvider.registerClusterSingletonService(this));
+
+        LOG.info("Registered clustering services for node {}", deviceInfo.getLOGValue());
     }
 
     @Override
-    public void setRpcContext(final RpcContext rpcContext) {
-        this.rpcContext = rpcContext;
-    }
-
-    @Override
-    public void setRoleContext(final RoleContext roleContext) {
-        this.roleContext = roleContext;
-    }
-
-    @Override
-    public void setStatContext(final StatisticsContext statContext) {
-        this.statContext = statContext;
-    }
-
-    @Override
-    public DeviceContext getDeviceContext() {
-        return this.deviceContext;
-    }
-
-    @Override
-    public void closeConnection() {
-        this.deviceContext.shutdownConnection();
-    }
-
-    private void fillDeviceFlowRegistry() {
-        // Fill device flow registry with flows from datastore
-        final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceContext.getDeviceFlowRegistry().fill();
-
-        // Start statistics scheduling only after we finished initializing device flow registry
-        Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
-            @Override
-            public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
-                if (LOG.isDebugEnabled()) {
-                    // Count all flows we read from datastore for debugging purposes.
-                    // This number do not always represent how many flows were actually added
-                    // to DeviceFlowRegistry, because of possible duplicates.
-                    long flowCount = Optional.fromNullable(result).asSet().stream()
-                            .flatMap(Collection::stream)
-                            .filter(Objects::nonNull)
-                            .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
-                            .filter(Objects::nonNull)
-                            .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
-                            .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
-                            .filter(Objects::nonNull)
-                            .filter(table -> Objects.nonNull(table.getFlow()))
-                            .flatMap(table -> table.getFlow().stream())
-                            .filter(Objects::nonNull)
-                            .count();
-
-                    LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceContext.getDeviceInfo().getLOGValue());
-                }
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                if (deviceFlowRegistryFill.isCancelled()) {
-                    LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceContext.getDeviceInfo().getLOGValue());
-                } else {
-                    LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceContext.getDeviceInfo().getLOGValue(), t);
-                }
-            }
-        });
+    public void registerDeviceRemovedHandler(@Nonnull final DeviceRemovedHandler deviceRemovedHandler) {
+        if (!deviceRemovedHandlers.contains(deviceRemovedHandler)) {
+            deviceRemovedHandlers.add(deviceRemovedHandler);
+        }
     }
 
 }