import com.google.common.base.Function;
import com.google.common.base.Optional;
+import com.google.common.base.Verify;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext.CONNECTION_STATE;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.ClusterInitializationPhaseHandler;
+import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
public class LifecycleServiceImpl implements LifecycleService {
private static final Logger LOG = LoggerFactory.getLogger(LifecycleServiceImpl.class);
-
- private boolean inClosing = false;
private DeviceContext deviceContext;
private RpcContext rpcContext;
private StatisticsContext statContext;
private ClusterSingletonServiceRegistration registration;
private ClusterInitializationPhaseHandler clusterInitializationPhaseHandler;
+ private final List<DeviceRemovedHandler> deviceRemovedHandlers = new ArrayList<>();
+ private volatile CONTEXT_STATE state = CONTEXT_STATE.INITIALIZATION;
@Override
public void instantiateServiceInstance() {
- LOG.info("Starting clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
+ LOG.info("Starting clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
- if (!this.clusterInitializationPhaseHandler.onContextInstantiateService(null)) {
- this.closeConnection();
+ if (!clusterInitializationPhaseHandler.onContextInstantiateService(null)) {
+ closeConnection();
}
-
}
@Override
public ListenableFuture<Void> closeServiceInstance() {
- LOG.info("Closing clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
-
final boolean connectionInterrupted =
this.deviceContext
.getPrimaryConnectionContext()
.getConnectionState()
.equals(ConnectionContext.CONNECTION_STATE.RIP);
- // If connection was interrupted and we are not trying to close service, then we received something
- // we do not wanted to receive, so do not continue
- if (connectionInterrupted && !inClosing) {
- LOG.warn("Failed to close clustering MASTER services for node {} because they are already closed",
- LifecycleServiceImpl.this.deviceContext.getDeviceInfo().getLOGValue());
-
- return Futures.immediateCancelledFuture();
- }
-
// Chain all jobs that will stop our services
final List<ListenableFuture<Void>> futureList = new ArrayList<>();
futureList.add(statContext.stopClusterServices(connectionInterrupted));
futureList.add(rpcContext.stopClusterServices(connectionInterrupted));
futureList.add(deviceContext.stopClusterServices(connectionInterrupted));
- // When we stopped all jobs then we are not in closing state anymore (at least from plugin perspective)
return Futures.transform(Futures.successfulAsList(futureList), new Function<List<Void>, Void>() {
@Nullable
@Override
public Void apply(@Nullable List<Void> input) {
- LOG.debug("Closed clustering MASTER services for node {}",
- LifecycleServiceImpl.this.deviceContext.getDeviceInfo().getLOGValue());
+ LOG.debug("Closed clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
return null;
}
});
@Override
public ServiceGroupIdentifier getIdentifier() {
+ return getServiceIdentifier();
+ }
+
+ @Override
+ public CONTEXT_STATE getState() {
+ return this.state;
+ }
+
+ @Override
+ public ServiceGroupIdentifier getServiceIdentifier() {
return deviceContext.getServiceIdentifier();
}
+ @Override
+ public DeviceInfo getDeviceInfo() {
+ return deviceContext.getDeviceInfo();
+ }
@Override
- public void close() throws Exception {
- // If we are still registered and we are not already closing, then close the registration
- if (Objects.nonNull(registration) && !inClosing) {
- inClosing = true;
- registration.close();
- registration = null;
+ public void close() {
+ if (CONTEXT_STATE.TERMINATION.equals(getState())){
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("LifecycleService is already in TERMINATION state.");
+ }
+ } else {
+ this.state = CONTEXT_STATE.TERMINATION;
+
+ // We are closing, so cleanup all managers now
+ deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(getDeviceInfo()));
+
+ // If we are still registered and we are not already closing, then close the registration
+ if (Objects.nonNull(registration)) {
+ try {
+ LOG.debug("Closing clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
+ registration.close();
+ } catch (Exception e) {
+ LOG.debug("Failed to close clustering MASTER services for node {} with exception: ",
+ getDeviceInfo().getLOGValue(), e);
+ }
+ }
}
}
@Override
public void registerService(final ClusterSingletonServiceProvider singletonServiceProvider) {
- LOG.info("Registering clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getLOGValue());
+ LOG.debug("Registered clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
- //lifecycle service -> device context -> statistics context -> rpc context -> role context -> lifecycle service
+ // lifecycle service -> device context -> statistics context -> rpc context -> role context -> lifecycle service
this.clusterInitializationPhaseHandler = deviceContext;
this.deviceContext.setLifecycleInitializationPhaseHandler(this.statContext);
this.statContext.setLifecycleInitializationPhaseHandler(this.rpcContext);
this.rpcContext.setLifecycleInitializationPhaseHandler(this);
//Set initial submit handler
this.statContext.setInitialSubmitHandler(this.deviceContext);
- //Register cluster singleton service
- this.registration = singletonServiceProvider.registerClusterSingletonService(this);
+
+ // Register cluster singleton service
+ try {
+ this.registration = Verify.verifyNotNull(singletonServiceProvider.registerClusterSingletonService(this));
+ LOG.info("Registered clustering MASTER services for node {}", getDeviceInfo().getLOGValue());
+ } catch (Exception e) {
+ LOG.warn("Failed to register cluster singleton service for node {}, with exception: {}", getDeviceInfo(), e);
+ closeConnection();
+ }
+ }
+
+ @Override
+ public void registerDeviceRemovedHandler(final DeviceRemovedHandler deviceRemovedHandler) {
+ if (!deviceRemovedHandlers.contains(deviceRemovedHandler)) {
+ deviceRemovedHandlers.add(deviceRemovedHandler);
+ }
}
@Override
@Override
public void closeConnection() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing connection for node {}.", getDeviceInfo().getLOGValue());
+ }
+
this.deviceContext.shutdownConnection();
}
@Override
public boolean onContextInstantiateService(final ConnectionContext connectionContext) {
-
- if (ConnectionContext.CONNECTION_STATE.RIP.equals(connectionContext.getConnectionState())) {
+ if (CONNECTION_STATE.RIP.equals(connectionContext.getConnectionState())) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Connection to the device {} was interrupted.", this.deviceContext.getDeviceInfo().getLOGValue());
+ LOG.debug("Connection to the device {} was interrupted.", getDeviceInfo().getLOGValue());
}
+
return false;
}
private class DeviceFlowRegistryCallback implements FutureCallback<List<Optional<FlowCapableNode>>> {
private final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill;
- public DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
+ DeviceFlowRegistryCallback(ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill) {
this.deviceFlowRegistryFill = deviceFlowRegistryFill;
}
.filter(Objects::nonNull)
.count();
- LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceContext.getDeviceInfo().getLOGValue());
+ LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, getDeviceInfo().getLOGValue());
}
}
public void onFailure(Throwable t) {
if (deviceFlowRegistryFill.isCancelled()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceContext.getDeviceInfo().getLOGValue());
+ LOG.debug("Cancelled filling flow registry with flows for node: {}", getDeviceInfo().getLOGValue());
}
} else {
- LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceContext.getDeviceInfo().getLOGValue(), t);
+ LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", getDeviceInfo().getLOGValue(), t);
}
}
}