*/
package org.opendaylight.openflowplugin.impl.lifecycle;
-import static org.opendaylight.openflowplugin.api.openflow.OFPContext.ContextState;
-
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
-import org.opendaylight.openflowplugin.api.ConnectionException;
import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
import org.opendaylight.yangtools.yang.common.RpcResult;
public class ContextChainImpl implements ContextChain {
private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class);
+
private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false);
private final AtomicBoolean initialGathering = new AtomicBoolean(false);
private final AtomicBoolean initialSubmitting = new AtomicBoolean(false);
private final AtomicBoolean registryFilling = new AtomicBoolean(false);
private final AtomicBoolean rpcRegistration = new AtomicBoolean(false);
private final List<DeviceRemovedHandler> deviceRemovedHandlers = new CopyOnWriteArrayList<>();
- private final List<OFPContext> contexts = new CopyOnWriteArrayList<>();
+ private final List<GuardedContext> contexts = new CopyOnWriteArrayList<>();
private final List<ConnectionContext> auxiliaryConnections = new CopyOnWriteArrayList<>();
private final ExecutorService executorService;
private final ContextChainMastershipWatcher contextChainMastershipWatcher;
private final DeviceInfo deviceInfo;
private final ConnectionContext primaryConnection;
+ private final AtomicReference<ContextChainState> contextChainState =
+ new AtomicReference<>(ContextChainState.UNDEFINED);
private AutoCloseable registration;
- private ContextState state = ContextState.INITIALIZATION;
-
- private volatile ContextChainState contextChainState = ContextChainState.UNDEFINED;
ContextChainImpl(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher,
@Nonnull final ConnectionContext connectionContext,
@Override
public <T extends OFPContext> void addContext(@Nonnull final T context) {
- contexts.add(context);
+ contexts.add(new GuardedContextImpl(context));
}
@Override
LOG.info("Starting clustering services for node {}", deviceInfo);
try {
- contexts.forEach(this::initializeContextService);
+ contexts.forEach(OFPContext::instantiateServiceInstance);
LOG.info("Started clustering services for node {}", deviceInfo);
} catch (final Exception ex) {
executorService.submit(() -> contextChainMastershipWatcher
- .onNotAbleToStartMastershipMandatory(deviceInfo, ex.getMessage()));
+ .onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString()));
}
}
contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
final ListenableFuture<List<Void>> servicesToBeClosed = Futures
- .successfulAsList(Lists.reverse(contexts)
+ .allAsList(Lists.reverse(contexts)
.stream()
- .map(this::closeContextService)
+ .map(OFPContext::closeServiceInstance)
.collect(Collectors.toList()));
return Futures.transform(servicesToBeClosed, (input) -> {
@Override
public void close() {
- if (ContextState.TERMINATION.equals(state)) {
+ if (ContextChainState.CLOSED.equals(contextChainState.get())) {
LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo);
return;
}
- state = ContextState.TERMINATION;
+ contextChainState.set(ContextChainState.CLOSED);
contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo);
// Close all connections to devices
@Override
public void makeContextChainStateSlave() {
unMasterMe();
- changeState(ContextChainState.WORKING_SLAVE);
+ changeMastershipState(ContextChainState.WORKING_SLAVE);
}
@Override
public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) {
LOG.info("Registering clustering services for node {}", deviceInfo);
- state = ContextState.WORKING;
registration = Objects.requireNonNull(clusterSingletonServiceProvider
.registerClusterSingletonService(this));
LOG.info("Registered clustering services for node {}", deviceInfo);
public void makeDeviceSlave() {
unMasterMe();
- contexts.stream()
- .filter(DeviceContext.class::isInstance)
- .map(DeviceContext.class::cast)
- .findAny()
- .ifPresent(deviceContext -> Futures
- .addCallback(
- deviceContext.makeDeviceSlave(),
- new DeviceSlaveCallback(),
- executorService));
+ contexts.forEach(context -> {
+ if (context.map(DeviceContext.class::isInstance)) {
+ Futures.addCallback(context.map(DeviceContext.class::cast).makeDeviceSlave(),
+ new DeviceSlaveCallback(), executorService);
+ }
+ });
}
@Override
LOG.info("Device {} is able to work as master{}",
deviceInfo,
registryFilling.get() ? "." : " WITHOUT flow registry !!!");
- changeState(ContextChainState.WORKING_MASTER);
+ changeMastershipState(ContextChainState.WORKING_MASTER);
}
return result;
@Override
public boolean isClosing() {
- return ContextState.TERMINATION.equals(state);
+ return ContextChainState.CLOSED.equals(contextChainState.get());
}
@Override
@Override
public boolean continueInitializationAfterReconciliation() {
- return isMastered(ContextChainMastershipState.INITIAL_SUBMIT) && contexts.stream()
- .filter(StatisticsContext.class::isInstance)
- .map(StatisticsContext.class::cast)
- .findAny()
- .map(StatisticsContext::initialSubmitAfterReconciliation)
- .orElse(false);
+ final AtomicBoolean initialSubmit = new AtomicBoolean(false);
+
+ contexts.forEach(context -> {
+ if (context.map(StatisticsContext.class::isInstance)) {
+ initialSubmit.set(context.map(StatisticsContext.class::cast).initialSubmitAfterReconciliation());
+ }
+ });
+
+ return initialSubmit.get() && isMastered(ContextChainMastershipState.INITIAL_SUBMIT);
}
@Override
deviceRemovedHandlers.add(deviceRemovedHandler);
}
- private void changeState(final ContextChainState contextChainState) {
- boolean propagate = this.contextChainState == ContextChainState.UNDEFINED;
- this.contextChainState = contextChainState;
-
- if (propagate) {
- contexts.stream()
- .filter(ContextChainStateListener.class::isInstance)
- .map(ContextChainStateListener.class::cast)
- .forEach(listener -> listener.onStateAcquired(contextChainState));
- }
- }
-
- private void initializeContextService(final OFPContext context) {
- if (ConnectionContext.CONNECTION_STATE.WORKING.equals(primaryConnection.getConnectionState())) {
- context.instantiateServiceInstance();
- } else {
- LOG.warn("Device connection for node {} doesn't exist anymore. Primary connection status: {}",
- deviceInfo,
- primaryConnection.getConnectionState());
+ private void changeMastershipState(final ContextChainState contextChainState) {
+ if (ContextChainState.CLOSED.equals(this.contextChainState.get())) {
+ return;
}
- }
- private ListenableFuture<Void> closeContextService(final OFPContext context) {
- if (ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState())) {
- final String errMsg = String
- .format("Device connection for node %s doesn't exist anymore. Primary connection status: %s",
- deviceInfo.toString(),
- primaryConnection.getConnectionState());
+ boolean propagate = ContextChainState.UNDEFINED.equals(this.contextChainState.get());
+ this.contextChainState.set(contextChainState);
- return Futures.immediateFailedFuture(new ConnectionException(errMsg));
+ if (propagate) {
+ contexts.forEach(context -> {
+ if (context.map(ContextChainStateListener.class::isInstance)) {
+ context.map(ContextChainStateListener.class::cast).onStateAcquired(contextChainState);
+ }
+ });
}
-
- return context.closeServiceInstance();
}
private void unMasterMe() {