package org.opendaylight.openflowplugin.impl.lifecycle;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext;
-import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
-import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.ReconciliationFrameworkStep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void instantiateServiceInstance() {
- LOG.info("Starting clustering services for node {}", deviceInfo);
-
try {
contexts.forEach(OFPContext::instantiateServiceInstance);
LOG.info("Started clustering services for node {}", deviceInfo);
} catch (final Exception ex) {
+ LOG.warn("Not able to start clustering services for node {}", deviceInfo);
executorService.submit(() -> contextChainMastershipWatcher
.onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString()));
}
@Override
public ListenableFuture<Void> closeServiceInstance() {
- LOG.info("Closing clustering services for node {}", deviceInfo);
+
contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
final ListenableFuture<List<Void>> servicesToBeClosed = Futures
}
@Override
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void close() {
if (ContextChainState.CLOSED.equals(contextChainState.get())) {
LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo);
}
contextChainState.set(ContextChainState.CLOSED);
- contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
+ unMasterMe();
// Close all connections to devices
auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false));
auxiliaryConnections.clear();
- primaryConnection.closeConnection(true);
-
- // Close all contexts (device, statistics, rpc)
- contexts.forEach(OFPContext::close);
- contexts.clear();
// If we are still registered and we are not already closing, then close the registration
if (Objects.nonNull(registration)) {
try {
- LOG.info("Closing clustering services registration for node {}", deviceInfo);
registration.close();
registration = null;
LOG.info("Closed clustering services registration for node {}", deviceInfo);
}
}
+
+ // Close all contexts (device, statistics, rpc)
+ contexts.forEach(OFPContext::close);
+ contexts.clear();
+
// We are closing, so cleanup all managers now
deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(deviceInfo));
deviceRemovedHandlers.clear();
+
+ primaryConnection.closeConnection(false);
+
}
@Override
@Override
public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
- LOG.info("Registering clustering services for node {}", deviceInfo);
registration = Objects.requireNonNull(clusterSingletonServiceProvider
.registerClusterSingletonService(this));
- LOG.info("Registered clustering services for node {}", deviceInfo);
- }
-
- @Override
- public void makeDeviceSlave() {
- unMasterMe();
-
- contexts.forEach(context -> {
- if (context.map(DeviceContext.class::isInstance)) {
- Futures.addCallback(context.map(DeviceContext.class::cast).makeDeviceSlave(),
- new DeviceSlaveCallback(), executorService);
- }
- });
+ LOG.debug("Registered clustering services for node {}", deviceInfo);
}
@Override
- public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState) {
+ public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState,
+ boolean inReconciliationFrameworkStep) {
switch (mastershipState) {
case INITIAL_SUBMIT:
LOG.debug("Device {}, initial submit OK.", deviceInfo);
// Flow registry fill is not mandatory to work as a master
LOG.debug("Device {}, initial registry filling OK.", deviceInfo);
this.registryFilling.set(true);
+ break;
case CHECK:
+ // no operation
+ break;
default:
+ // no operation
+ break;
}
final boolean result = initialGathering.get() &&
masterStateOnDevice.get() &&
- initialSubmitting.get() &&
- rpcRegistration.get();
+ rpcRegistration.get() &&
+ inReconciliationFrameworkStep || initialSubmitting.get();
- if (result && mastershipState != ContextChainMastershipState.CHECK) {
+ if (!inReconciliationFrameworkStep &&
+ result &&
+ mastershipState != ContextChainMastershipState.CHECK) {
LOG.info("Device {} is able to work as master{}",
deviceInfo,
registryFilling.get() ? "." : " WITHOUT flow registry !!!");
}
@Override
- public boolean isPrepared() {
- return this.initialGathering.get() &&
- this.masterStateOnDevice.get() &&
- this.rpcRegistration.get();
- }
-
- @Override
- public boolean continueInitializationAfterReconciliation() {
- final AtomicBoolean initialSubmit = new AtomicBoolean(false);
-
+ public void continueInitializationAfterReconciliation() {
contexts.forEach(context -> {
- if (context.map(StatisticsContext.class::isInstance)) {
- initialSubmit.set(context.map(StatisticsContext.class::cast).initialSubmitAfterReconciliation());
+ if (context.map(ReconciliationFrameworkStep.class::isInstance)) {
+ context.map(ReconciliationFrameworkStep.class::cast).continueInitializationAfterReconciliation();
}
});
-
- return initialSubmit.get() && isMastered(ContextChainMastershipState.INITIAL_SUBMIT);
}
@Override
masterStateOnDevice.set(false);
rpcRegistration.set(false);
}
-
- private final class DeviceSlaveCallback implements FutureCallback<RpcResult<SetRoleOutput>> {
- @Override
- public void onSuccess(@Nullable final RpcResult<SetRoleOutput> result) {
- contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
- }
-
- @Override
- public void onFailure(@Nonnull final Throwable t) {
- contextChainMastershipWatcher.onSlaveRoleNotAcquired(deviceInfo);
- }
- }
-}
+}
\ No newline at end of file