*/
package org.opendaylight.openflowplugin.impl.lifecycle;
-import static org.opendaylight.openflowplugin.api.openflow.OFPContext.ContextState;
-
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
import org.opendaylight.yangtools.yang.common.RpcResult;
public class ContextChainImpl implements ContextChain {
private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class);
+
private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false);
private final AtomicBoolean initialGathering = new AtomicBoolean(false);
private final AtomicBoolean initialSubmitting = new AtomicBoolean(false);
private final AtomicBoolean registryFilling = new AtomicBoolean(false);
private final AtomicBoolean rpcRegistration = new AtomicBoolean(false);
private final List<DeviceRemovedHandler> deviceRemovedHandlers = new CopyOnWriteArrayList<>();
- private final List<OFPContext> contexts = new CopyOnWriteArrayList<>();
+ private final List<GuardedContext> contexts = new CopyOnWriteArrayList<>();
private final List<ConnectionContext> auxiliaryConnections = new CopyOnWriteArrayList<>();
private final ExecutorService executorService;
private final ContextChainMastershipWatcher contextChainMastershipWatcher;
private final DeviceInfo deviceInfo;
private final ConnectionContext primaryConnection;
+ private final AtomicReference<ContextChainState> contextChainState =
+ new AtomicReference<>(ContextChainState.UNDEFINED);
private AutoCloseable registration;
- private ContextState state = ContextState.INITIALIZATION;
- private Future<?> initFuture;
-
- private volatile ContextChainState contextChainState = ContextChainState.UNDEFINED;
ContextChainImpl(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher,
@Nonnull final ConnectionContext connectionContext,
@Override
public <T extends OFPContext> void addContext(@Nonnull final T context) {
- contexts.add(context);
+ contexts.add(new GuardedContextImpl(context));
}
@Override
public void instantiateServiceInstance() {
LOG.info("Starting clustering services for node {}", deviceInfo);
- initFuture = executorService.submit(() -> {
- try {
- contexts.forEach(context -> {
- if (ConnectionContext.CONNECTION_STATE.WORKING.equals(primaryConnection.getConnectionState())) {
- context.instantiateServiceInstance();
- }
- });
- LOG.info("Started clustering services for node {}", deviceInfo);
- } catch (final Exception ex) {
- contextChainMastershipWatcher.onNotAbleToStartMastershipMandatory(deviceInfo, ex.getMessage());
- }
- });
+ try {
+ contexts.forEach(OFPContext::instantiateServiceInstance);
+ LOG.info("Started clustering services for node {}", deviceInfo);
+ } catch (final Exception ex) {
+ executorService.submit(() -> contextChainMastershipWatcher
+ .onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString()));
+ }
}
@Override
contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
final ListenableFuture<List<Void>> servicesToBeClosed = Futures
- .successfulAsList(Lists.reverse(contexts)
+ .allAsList(Lists.reverse(contexts)
.stream()
.map(OFPContext::closeServiceInstance)
.collect(Collectors.toList()));
@Override
public void close() {
- if (ContextState.TERMINATION.equals(state)) {
+ if (ContextChainState.CLOSED.equals(contextChainState.get())) {
LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo);
return;
}
- state = ContextState.TERMINATION;
+ contextChainState.set(ContextChainState.CLOSED);
contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
- // If we somehow have initialization still running, cancel it
- if (Objects.nonNull(initFuture)) {
- if (!initFuture.isCancelled() && !initFuture.isDone()) {
- LOG.info("Waiting for finishing the running initialization process for node {}", deviceInfo);
-
- try {
- initFuture.get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.warn("Failed to await running initialization for node {}: {}", deviceInfo, e);
- }
- }
-
- initFuture = null;
- }
-
// Close all connections to devices
auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false));
auxiliaryConnections.clear();
primaryConnection.closeConnection(true);
-
// Close all contexts (device, statistics, rpc)
contexts.forEach(OFPContext::close);
contexts.clear();
@Override
public void makeContextChainStateSlave() {
unMasterMe();
- changeState(ContextChainState.WORKING_SLAVE);
+ changeMastershipState(ContextChainState.WORKING_SLAVE);
}
@Override
public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
LOG.info("Registering clustering services for node {}", deviceInfo);
- state = ContextState.WORKING;
registration = Objects.requireNonNull(clusterSingletonServiceProvider
.registerClusterSingletonService(this));
LOG.info("Registered clustering services for node {}", deviceInfo);
public void makeDeviceSlave() {
unMasterMe();
- contexts.stream()
- .filter(DeviceContext.class::isInstance)
- .map(DeviceContext.class::cast)
- .findAny()
- .ifPresent(deviceContext -> Futures
- .addCallback(
- deviceContext.makeDeviceSlave(),
- new DeviceSlaveCallback(),
- executorService));
+ contexts.forEach(context -> {
+ if (context.map(DeviceContext.class::isInstance)) {
+ Futures.addCallback(context.map(DeviceContext.class::cast).makeDeviceSlave(),
+ new DeviceSlaveCallback(), executorService);
+ }
+ });
}
@Override
LOG.info("Device {} is able to work as master{}",
deviceInfo,
registryFilling.get() ? "." : " WITHOUT flow registry !!!");
- changeState(ContextChainState.WORKING_MASTER);
+ changeMastershipState(ContextChainState.WORKING_MASTER);
}
return result;
@Override
public boolean isClosing() {
- return ContextState.TERMINATION.equals(state);
+ return ContextChainState.CLOSED.equals(contextChainState.get());
}
@Override
@Override
public boolean continueInitializationAfterReconciliation() {
- return isMastered(ContextChainMastershipState.INITIAL_SUBMIT) && contexts.stream()
- .filter(StatisticsContext.class::isInstance)
- .map(StatisticsContext.class::cast)
- .findAny()
- .map(StatisticsContext::initialSubmitAfterReconciliation)
- .orElse(false);
+ final AtomicBoolean initialSubmit = new AtomicBoolean(false);
+
+ contexts.forEach(context -> {
+ if (context.map(StatisticsContext.class::isInstance)) {
+ initialSubmit.set(context.map(StatisticsContext.class::cast).initialSubmitAfterReconciliation());
+ }
+ });
+
+ return initialSubmit.get() && isMastered(ContextChainMastershipState.INITIAL_SUBMIT);
}
@Override
deviceRemovedHandlers.add(deviceRemovedHandler);
}
- private void changeState(final ContextChainState contextChainState) {
- boolean propagate = this.contextChainState == ContextChainState.UNDEFINED;
- this.contextChainState = contextChainState;
+ private void changeMastershipState(final ContextChainState contextChainState) {
+ if (ContextChainState.CLOSED.equals(this.contextChainState.get())) {
+ return;
+ }
+
+ boolean propagate = ContextChainState.UNDEFINED.equals(this.contextChainState.get());
+ this.contextChainState.set(contextChainState);
if (propagate) {
- contexts.stream()
- .filter(ContextChainStateListener.class::isInstance)
- .map(ContextChainStateListener.class::cast)
- .forEach(listener -> listener.onStateAcquired(contextChainState));
+ contexts.forEach(context -> {
+ if (context.map(ContextChainStateListener.class::isInstance)) {
+ context.map(ContextChainStateListener.class::cast).onStateAcquired(contextChainState);
+ }
+ });
}
}