X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Flifecycle%2FContextChainImpl.java;h=06b274527d5d80cba0071ae6fd838d77e6b8c0ad;hb=HEAD;hp=886e3b03f5d3e7f6f0a998a26987054990cfc86c;hpb=4ad505cc51f9bda8290c6191e4e16a5d36ce3b27;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java index 886e3b03f5..06b274527d 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java @@ -7,157 +7,249 @@ */ package org.opendaylight.openflowplugin.impl.lifecycle; -import com.google.common.base.Function; +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import javax.annotation.Nullable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.eclipse.jdt.annotation.NonNull; -import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; +import org.opendaylight.mdsal.singleton.api.ClusterSingletonServiceProvider; +import org.opendaylight.mdsal.singleton.api.ServiceGroupIdentifier; import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; +import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; -import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; -import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.ContextChainState; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ReconciliationFrameworkStep; +import org.opendaylight.yangtools.concepts.Registration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ContextChainImpl implements ContextChain { - private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class); - - private Set contexts = new HashSet<>(); - private StatisticsContext statisticsContext; - private DeviceContext deviceContext; - private RpcContext rpcContext; - private volatile ContextChainState contextChainState; - private LifecycleService lifecycleService; - private ConnectionContext primaryConnectionContext; - - public ContextChainImpl() { - this.contextChainState = ContextChainState.INITIALIZED; + private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog"); + + private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false); + private final AtomicBoolean initialSubmitting = new AtomicBoolean(false); + private final AtomicBoolean rpcRegistration = new AtomicBoolean(false); + private final List deviceRemovedHandlers = new CopyOnWriteArrayList<>(); + private final List contexts = new CopyOnWriteArrayList<>(); + private final List auxiliaryConnections = new CopyOnWriteArrayList<>(); + private final Executor executor; + private final ContextChainMastershipWatcher contextChainMastershipWatcher; + private final DeviceInfo deviceInfo; + private final ConnectionContext primaryConnection; + private final AtomicReference contextChainState = + new AtomicReference<>(ContextChainState.UNDEFINED); + private Registration registration; + + ContextChainImpl(final @NonNull ContextChainMastershipWatcher contextChainMastershipWatcher, + final @NonNull ConnectionContext connectionContext, final @NonNull Executor executor) { + this.contextChainMastershipWatcher = requireNonNull(contextChainMastershipWatcher); + primaryConnection = requireNonNull(connectionContext); + this.executor = requireNonNull(executor); + deviceInfo = connectionContext.getDeviceInfo(); } @Override - public boolean isReady() { - return false; + public void addContext(@NonNull final T context) { + contexts.add(new GuardedContextImpl(context)); } @Override - public void addContext(final T context) { - if (context instanceof StatisticsContext) { - this.statisticsContext = (StatisticsContext) context; - } else { - if (context instanceof DeviceContext) { - this.deviceContext = (DeviceContext) context; - } else { - if (context instanceof RpcContext) { - this.rpcContext = (RpcContext) context; - } - } + @SuppressWarnings("checkstyle:IllegalCatch") + public void instantiateServiceInstance() { + OF_EVENT_LOG.debug("Clustering Service Invocation, Node: {}", deviceInfo); + try { + contexts.forEach(OFPContext::instantiateServiceInstance); + LOG.info("Started clustering services for node {}", deviceInfo); + } catch (final Exception ex) { + LOG.error("Not able to start clustering services for node {}", deviceInfo); + executor.execute(() -> contextChainMastershipWatcher + .onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString())); } - contexts.add(context); } @Override - public void addLifecycleService(final LifecycleService lifecycleService) { - this.lifecycleService = lifecycleService; + public ListenableFuture closeServiceInstance() { + + contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo); + + final ListenableFuture servicesToBeClosed = Futures.allAsList(Lists.reverse(contexts).stream() + .map(OFPContext::closeServiceInstance) + .collect(Collectors.toList())); + + return Futures.transform(servicesToBeClosed, input -> { + OF_EVENT_LOG.debug("Closing clustering Services, Node: {}", deviceInfo); + LOG.info("Closed clustering services for node {}", deviceInfo); + return null; + }, executor); } @Override - public ListenableFuture stopChain() { - //TODO: stopClusterServices change parameter - final List> futureList = new ArrayList<>(); - futureList.add(statisticsContext.stopClusterServices(true)); - futureList.add(rpcContext.stopClusterServices(false)); - futureList.add(deviceContext.stopClusterServices(false)); - - return Futures.transform(Futures.successfulAsList(futureList), new Function, Void>() { - @Nullable - @Override - public Void apply(@Nullable List input) { - LOG.debug("Closed clustering MASTER services for node {}", deviceContext.getDeviceInfo().getLOGValue()); - contextChainState = ContextChainState.WORKINGSLAVE; - return null; - } - }); + public ServiceGroupIdentifier getIdentifier() { + return deviceInfo.getServiceIdentifier(); } @Override - public ListenableFuture startChain() { - if (ContextChainState.INITIALIZED.equals(this.contextChainState)) { - return Futures.transform(this.statisticsContext.initialGatherDynamicData(), new Function() { - @Nullable - @Override - public Void apply(@Nullable Boolean aBoolean) { - contextChainState = ContextChainState.WORKINGMASTER; - return null; - } - }); - } else { - this.contextChainState = ContextChainState.WORKINGMASTER; + @SuppressWarnings("checkstyle:IllegalCatch") + public void close() { + if (ContextChainState.CLOSED.equals(contextChainState.get())) { + LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo); + return; } - return Futures.immediateFuture(null); + + contextChainState.set(ContextChainState.CLOSED); + unMasterMe(); + + // Close all connections to devices + auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false)); + auxiliaryConnections.clear(); + + // If we are still registered and we are not already closing, then close the registration + if (registration != null) { + registration.close(); + registration = null; + LOG.info("Closed clustering services registration for node {}", deviceInfo); + OF_EVENT_LOG.debug("Closed clustering services registration for node {}", deviceInfo); + } + + + // Close all contexts (device, statistics, rpc) + contexts.forEach(OFPContext::close); + contexts.clear(); + + // We are closing, so cleanup all managers now + deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(deviceInfo)); + deviceRemovedHandlers.clear(); + + primaryConnection.closeConnection(false); + } @Override - public void close() { + public void makeContextChainStateSlave() { + unMasterMe(); + changeMastershipState(ContextChainState.WORKING_SLAVE); + } + @Override + public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) { + registration = clusterSingletonServiceProvider.registerClusterSingletonService(this); + LOG.debug("Registered clustering services for node {}", deviceInfo); + OF_EVENT_LOG.debug("Registered Clustering Services, Node: {}", deviceInfo); } @Override - public void changePrimaryConnection(final ConnectionContext connectionContext) { - for (OFPContext context : contexts) { - context.replaceConnection(connectionContext); + public boolean isMastered(final ContextChainMastershipState mastershipState, + final boolean inReconciliationFrameworkStep) { + switch (mastershipState) { + case INITIAL_SUBMIT: + LOG.debug("Device {}, initial submit OK.", deviceInfo); + OF_EVENT_LOG.debug("Device {}, initial submit OK.", deviceInfo); + initialSubmitting.set(true); + break; + case MASTER_ON_DEVICE: + LOG.debug("Device {}, master state OK.", deviceInfo); + OF_EVENT_LOG.debug("Device {}, master state OK.", deviceInfo); + masterStateOnDevice.set(true); + break; + case RPC_REGISTRATION: + LOG.debug("Device {}, RPC registration OK.", deviceInfo); + OF_EVENT_LOG.debug("Device {}, RPC registration OK.", deviceInfo); + rpcRegistration.set(true); + break; + case CHECK: + // no operation + break; + default: + // no operation + break; } - this.primaryConnectionContext = connectionContext; + + final boolean result = masterStateOnDevice.get() && rpcRegistration.get() + && inReconciliationFrameworkStep || initialSubmitting.get(); + + if (!inReconciliationFrameworkStep && result && mastershipState != ContextChainMastershipState.CHECK) { + LOG.info("Device {} is able to work as master", deviceInfo); + changeMastershipState(ContextChainState.WORKING_MASTER); + } + + return result; } @Override - public ContextChainState getContextChainState() { - return contextChainState; + public boolean isClosing() { + return ContextChainState.CLOSED.equals(contextChainState.get()); } @Override - public ListenableFuture connectionDropped() { - ContextChainState oldState = this.contextChainState; - this.contextChainState = ContextChainState.SLEEPING; - if (oldState.equals(ContextChainState.WORKINGMASTER)) { - return this.stopChain(); - } - return Futures.immediateFuture(null); + public void continueInitializationAfterReconciliation() { + contexts.forEach(context -> { + if (context.map(ReconciliationFrameworkStep.class::isInstance)) { + context.map(ReconciliationFrameworkStep.class::cast).continueInitializationAfterReconciliation(); + } + }); } @Override - public ConnectionContext getPrimaryConnectionContext() { - return primaryConnectionContext; + public void initializeDevice() { + contexts.forEach(context -> { + if (context.map(DeviceInitializationContext.class::isInstance)) { + context.map(DeviceInitializationContext.class::cast).initializeDevice(); + } + }); } @Override - public void sleepTheChainAndDropConnection() { - this.contextChainState = ContextChainState.SLEEPING; - this.primaryConnectionContext.closeConnection(false); + public boolean addAuxiliaryConnection(@NonNull final ConnectionContext connectionContext) { + return connectionContext.getFeatures().getAuxiliaryId().toJava() != 0 + && !ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState()) + && auxiliaryConnections.add(connectionContext); } @Override - public void registerServices(@NonNull final ClusterSingletonServiceProvider clusterSingletonServiceProvider) { - this.contextChainState = ContextChainState.WORKINGSLAVE; - this.lifecycleService.registerService( - clusterSingletonServiceProvider, - this.deviceContext, - this.deviceContext.getServiceIdentifier(), - this.deviceContext.getDeviceInfo()); + public boolean auxiliaryConnectionDropped(@NonNull final ConnectionContext connectionContext) { + return auxiliaryConnections.remove(connectionContext); } @Override - public void makeDeviceSlave() { - this.lifecycleService.makeDeviceSlave(this.deviceContext); + public void registerDeviceRemovedHandler(@NonNull final DeviceRemovedHandler deviceRemovedHandler) { + deviceRemovedHandlers.add(deviceRemovedHandler); } + private void changeMastershipState(final ContextChainState newContextChainState) { + if (ContextChainState.CLOSED.equals(contextChainState.get())) { + return; + } + + boolean propagate = ContextChainState.UNDEFINED.equals(contextChainState.get()); + contextChainState.set(newContextChainState); + + if (propagate) { + contexts.forEach(context -> { + if (context.map(ContextChainStateListener.class::isInstance)) { + context.map(ContextChainStateListener.class::cast).onStateAcquired(newContextChainState); + } + }); + } + } + + private void unMasterMe() { + initialSubmitting.set(false); + masterStateOnDevice.set(false); + rpcRegistration.set(false); + } }