X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Flifecycle%2FContextChainImpl.java;h=2cc9aea0527234c53984ea71ca81fbe37cbb4e08;hb=refs%2Fchanges%2F77%2F100077%2F17;hp=6641176e16976f0983d78230d0b51bd1eb1274b6;hpb=cf765f4281bb480b30451bf74196819c0a449a85;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java index 6641176e16..2cc9aea052 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainImpl.java @@ -7,221 +7,255 @@ */ package org.opendaylight.openflowplugin.impl.lifecycle; -import com.google.common.base.Function; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.netty.util.internal.ConcurrentSet; -import java.util.ArrayList; import java.util.List; -import java.util.Set; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; +import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier; import org.opendaylight.openflowplugin.api.openflow.OFPContext; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; -import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; +import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceRemovedHandler; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipWatcher; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainStateListener; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; -import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; -import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.DeviceInitializationContext; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.GuardedContext; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ReconciliationFrameworkStep; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ContextChainImpl implements ContextChain { - private static final Logger LOG = LoggerFactory.getLogger(ContextChainImpl.class); + private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog"); - private Set contexts = new ConcurrentSet<>(); - private StatisticsContext statisticsContext; - private DeviceContext deviceContext; - private RpcContext rpcContext; - private LifecycleService lifecycleService; - private DeviceInfo deviceInfo; - private ConnectionContext primaryConnection; - private Set auxiliaryConnections = new ConcurrentSet<>(); - - private volatile ContextChainState contextChainState; + private final AtomicBoolean masterStateOnDevice = new AtomicBoolean(false); + private final AtomicBoolean initialSubmitting = new AtomicBoolean(false); + private final AtomicBoolean rpcRegistration = new AtomicBoolean(false); + private final List deviceRemovedHandlers = new CopyOnWriteArrayList<>(); + private final List contexts = new CopyOnWriteArrayList<>(); + private final List auxiliaryConnections = new CopyOnWriteArrayList<>(); + private final Executor executor; + private final ContextChainMastershipWatcher contextChainMastershipWatcher; + private final DeviceInfo deviceInfo; + private final ConnectionContext primaryConnection; + private final AtomicReference contextChainState = + new AtomicReference<>(ContextChainState.UNDEFINED); + private AutoCloseable registration; - private AtomicBoolean masterStateOnDevice; - private AtomicBoolean initialGathering; - private AtomicBoolean initialSubmitting; - private AtomicBoolean registryFilling; - - ContextChainImpl(final ConnectionContext connectionContext) { + ContextChainImpl(@NonNull final ContextChainMastershipWatcher contextChainMastershipWatcher, + @NonNull final ConnectionContext connectionContext, + @NonNull final Executor executor) { + this.contextChainMastershipWatcher = contextChainMastershipWatcher; this.primaryConnection = connectionContext; - this.contextChainState = ContextChainState.UNDEFINED; - this.masterStateOnDevice = new AtomicBoolean(false); - this.initialGathering = new AtomicBoolean(false); - this.initialSubmitting = new AtomicBoolean(false); - this.registryFilling = new AtomicBoolean(false); this.deviceInfo = connectionContext.getDeviceInfo(); + this.executor = executor; } @Override - public void addContext(final T context) { - if (context instanceof StatisticsContext) { - this.statisticsContext = (StatisticsContext) context; - } else { - if (context instanceof DeviceContext) { - this.deviceContext = (DeviceContext) context; - } else { - if (context instanceof RpcContext) { - this.rpcContext = (RpcContext) context; - } - } - } - - contexts.add(context); + public void addContext(@NonNull final T context) { + contexts.add(new GuardedContextImpl(context)); } @Override - public void addLifecycleService(final LifecycleService lifecycleService) { - this.lifecycleService = lifecycleService; + @SuppressWarnings("checkstyle:IllegalCatch") + public void instantiateServiceInstance() { + OF_EVENT_LOG.debug("Clustering Service Invocation, Node: {}", deviceInfo); + try { + contexts.forEach(OFPContext::instantiateServiceInstance); + LOG.info("Started clustering services for node {}", deviceInfo); + } catch (final Exception ex) { + LOG.error("Not able to start clustering services for node {}", deviceInfo); + executor.execute(() -> contextChainMastershipWatcher + .onNotAbleToStartMastershipMandatory(deviceInfo, ex.toString())); + } } @Override - public ListenableFuture stopChain() { - //TODO: stopClusterServices change parameter - final List> futureList = new ArrayList<>(); - futureList.add(statisticsContext.stopClusterServices()); - futureList.add(rpcContext.stopClusterServices()); - futureList.add(deviceContext.stopClusterServices()); - this.unMasterMe(); - return Futures.transform(Futures.successfulAsList(futureList), new Function, Void>() { - @Nullable - @Override - public Void apply(@Nullable List input) { - LOG.info("Closed clustering MASTER services for node {}", deviceContext.getDeviceInfo().getLOGValue()); - return null; - } - }); + public ListenableFuture closeServiceInstance() { + + contextChainMastershipWatcher.onSlaveRoleAcquired(deviceInfo); + + final ListenableFuture servicesToBeClosed = Futures.allAsList(Lists.reverse(contexts).stream() + .map(OFPContext::closeServiceInstance) + .collect(Collectors.toList())); + + return Futures.transform(servicesToBeClosed, input -> { + OF_EVENT_LOG.debug("Closing clustering Services, Node: {}", deviceInfo); + LOG.info("Closed clustering services for node {}", deviceInfo); + return null; + }, executor); } - private void unMasterMe() { - this.registryFilling.set(false); - this.initialSubmitting.set(false); - this.initialGathering.set(false); - this.masterStateOnDevice.set(false); + @NonNull + @Override + public ServiceGroupIdentifier getIdentifier() { + return deviceInfo.getServiceIdentifier(); } @Override + @SuppressWarnings("checkstyle:IllegalCatch") public void close() { - this.auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false)); - if (this.primaryConnection.getConnectionState() != ConnectionContext.CONNECTION_STATE.RIP) { - this.primaryConnection.closeConnection(true); + if (ContextChainState.CLOSED.equals(contextChainState.get())) { + LOG.debug("ContextChain for node {} is already in TERMINATION state.", deviceInfo); + return; } - lifecycleService.close(); - deviceContext.close(); - rpcContext.close(); - statisticsContext.close(); - } - @Override - public void makeContextChainStateSlave() { - this.unMasterMe(); - changeState(ContextChainState.WORKING_SLAVE); - } + contextChainState.set(ContextChainState.CLOSED); + unMasterMe(); - @Override - public ListenableFuture connectionDropped() { - if (this.contextChainState == ContextChainState.WORKING_MASTER) { - return this.stopChain(); + // Close all connections to devices + auxiliaryConnections.forEach(connectionContext -> connectionContext.closeConnection(false)); + auxiliaryConnections.clear(); + + // If we are still registered and we are not already closing, then close the registration + if (registration != null) { + try { + registration.close(); + registration = null; + LOG.info("Closed clustering services registration for node {}", deviceInfo); + OF_EVENT_LOG.debug("Closed clustering services registration for node {}", deviceInfo); + } catch (final Exception e) { + LOG.warn("Failed to close clustering services registration for node {} with exception: ", + deviceInfo, e); + } } - this.unMasterMe(); - return Futures.immediateFuture(null); + + + // Close all contexts (device, statistics, rpc) + contexts.forEach(OFPContext::close); + contexts.clear(); + + // We are closing, so cleanup all managers now + deviceRemovedHandlers.forEach(h -> h.onDeviceRemoved(deviceInfo)); + deviceRemovedHandlers.clear(); + + primaryConnection.closeConnection(false); + } @Override - public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) { - this.lifecycleService.registerService( - clusterSingletonServiceProvider, - this.deviceContext); + public void makeContextChainStateSlave() { + unMasterMe(); + changeMastershipState(ContextChainState.WORKING_SLAVE); } @Override - public void makeDeviceSlave() { - this.unMasterMe(); - this.lifecycleService.makeDeviceSlave(this.deviceContext); + public void registerServices(final ClusterSingletonServiceProvider clusterSingletonServiceProvider) { + registration = Objects.requireNonNull(clusterSingletonServiceProvider + .registerClusterSingletonService(this)); + LOG.debug("Registered clustering services for node {}", deviceInfo); + OF_EVENT_LOG.debug("Registered Clustering Services, Node: {}", deviceInfo); } @Override - public boolean isMastered(@Nonnull ContextChainMastershipState mastershipState) { + public boolean isMastered(@NonNull final ContextChainMastershipState mastershipState, + final boolean inReconciliationFrameworkStep) { switch (mastershipState) { case INITIAL_SUBMIT: - LOG.debug("Device {}, initial submit OK.", deviceInfo.getLOGValue()); + LOG.debug("Device {}, initial submit OK.", deviceInfo); + OF_EVENT_LOG.debug("Device {}, initial submit OK.", deviceInfo); this.initialSubmitting.set(true); break; case MASTER_ON_DEVICE: - LOG.debug("Device {}, master state OK.", deviceInfo.getLOGValue()); + LOG.debug("Device {}, master state OK.", deviceInfo); + OF_EVENT_LOG.debug("Device {}, master state OK.", deviceInfo); this.masterStateOnDevice.set(true); break; - case INITIAL_GATHERING: - LOG.debug("Device {}, initial gathering OK.", deviceInfo.getLOGValue()); - this.initialGathering.set(true); + case RPC_REGISTRATION: + LOG.debug("Device {}, RPC registration OK.", deviceInfo); + OF_EVENT_LOG.debug("Device {}, RPC registration OK.", deviceInfo); + this.rpcRegistration.set(true); break; - //Flow registry fill is not mandatory to work as a master - case INITIAL_FLOW_REGISTRY_FILL: - LOG.debug("Device {}, initial registry filling OK.", deviceInfo.getLOGValue()); - this.registryFilling.set(true); case CHECK: + // no operation + break; default: + // no operation + break; } - final boolean result = - this.initialGathering.get() && - this.masterStateOnDevice.get() && - this.initialSubmitting.get(); - - if (result && mastershipState != ContextChainMastershipState.CHECK) { - LOG.info("Device {} is able to work as master{}", - deviceInfo.getLOGValue(), - this.registryFilling.get() ? " WITHOUT flow registry !!!" : "."); - changeState(ContextChainState.WORKING_MASTER); + + final boolean result = masterStateOnDevice.get() && rpcRegistration.get() + && inReconciliationFrameworkStep || initialSubmitting.get(); + + if (!inReconciliationFrameworkStep && result && mastershipState != ContextChainMastershipState.CHECK) { + LOG.info("Device {} is able to work as master", deviceInfo); + changeMastershipState(ContextChainState.WORKING_MASTER); } + return result; } @Override - public boolean hasState() { - return contextChainState == ContextChainState.WORKING_MASTER - || contextChainState == ContextChainState.WORKING_SLAVE; + public boolean isClosing() { + return ContextChainState.CLOSED.equals(contextChainState.get()); } @Override - public boolean addAuxiliaryConnection(@Nonnull ConnectionContext connectionContext) { - if (this.primaryConnection.getConnectionState() != ConnectionContext.CONNECTION_STATE.RIP) { - this.auxiliaryConnections.add(connectionContext); - return true; - } else { - return false; - } + public void continueInitializationAfterReconciliation() { + contexts.forEach(context -> { + if (context.map(ReconciliationFrameworkStep.class::isInstance)) { + context.map(ReconciliationFrameworkStep.class::cast).continueInitializationAfterReconciliation(); + } + }); } @Override - public boolean auxiliaryConnectionDropped(@Nonnull ConnectionContext connectionContext) { - if (this.auxiliaryConnections.isEmpty()) { - return false; - } - if (!this.auxiliaryConnections.contains(connectionContext)) { - return false; - } - this.auxiliaryConnections.remove(connectionContext); - return true; + public void initializeDevice() { + contexts.forEach(context -> { + if (context.map(DeviceInitializationContext.class::isInstance)) { + context.map(DeviceInitializationContext.class::cast).initializeDevice(); + } + }); + } + + @Override + public boolean addAuxiliaryConnection(@NonNull final ConnectionContext connectionContext) { + return connectionContext.getFeatures().getAuxiliaryId().toJava() != 0 + && !ConnectionContext.CONNECTION_STATE.RIP.equals(primaryConnection.getConnectionState()) + && auxiliaryConnections.add(connectionContext); + } + + @Override + public boolean auxiliaryConnectionDropped(@NonNull final ConnectionContext connectionContext) { + return auxiliaryConnections.remove(connectionContext); + } + + @Override + public void registerDeviceRemovedHandler(@NonNull final DeviceRemovedHandler deviceRemovedHandler) { + deviceRemovedHandlers.add(deviceRemovedHandler); } - private void changeState(final ContextChainState contextChainState) { - boolean propagate = this.contextChainState == ContextChainState.UNDEFINED; - this.contextChainState = contextChainState; + private void changeMastershipState(final ContextChainState newContextChainState) { + if (ContextChainState.CLOSED.equals(this.contextChainState.get())) { + return; + } + + boolean propagate = ContextChainState.UNDEFINED.equals(this.contextChainState.get()); + this.contextChainState.set(newContextChainState); if (propagate) { - contexts.stream() - .filter(ContextChainStateListener.class::isInstance) - .map(ContextChainStateListener.class::cast) - .forEach(listener -> listener.onStateAcquired(contextChainState)); + contexts.forEach(context -> { + if (context.map(ContextChainStateListener.class::isInstance)) { + context.map(ContextChainStateListener.class::cast).onStateAcquired(newContextChainState); + } + }); } } + + private void unMasterMe() { + initialSubmitting.set(false); + masterStateOnDevice.set(false); + rpcRegistration.set(false); + } }