X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Flifecycle%2FContextChainHolderImpl.java;h=9ad10e494e2bd4d7136b706ef57a13ca199883e3;hb=d19bc648ae039b47fab3bebe1d501aa9a0522258;hp=9f80ce38244376a516bb72eab89f27bc3f96ef04;hpb=a5372819881fb5477e19ad96426883865eb57d78;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java index 9f80ce3824..9ad10e494e 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java @@ -7,15 +7,27 @@ */ package org.opendaylight.openflowplugin.impl.lifecycle; -import com.google.common.base.Verify; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import io.netty.util.HashedWheelTimer; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.openflowplugin.api.openflow.OFPManager; @@ -26,241 +38,332 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder; -import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.MasterChecker; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.OwnershipChangeListener; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext; import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.ContextChainState; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.openflow.provider.config.ContextChainConfig; +import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil; +import org.opendaylight.openflowplugin.impl.util.ItemScheduler; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node; +import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState; +import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ContextChainHolderImpl implements ContextChainHolder { - +public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker { private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class); - private static final String NOT_ALL_MANAGER_WERE_SET = "Not all manager were set."; - private static final String MANAGER_WAS_SET = " manager was set"; - private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}"; - private static final String SINGLETON_SERVICE_PROVIDER_WAS_NOT_SET_YET - = "Singleton service provider was not set yet."; + private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}"; + private static final long CHECK_ROLE_MASTER_TIMEOUT = 20000L; + private static final long CHECK_ROLE_MASTER_TOLERANCE = CHECK_ROLE_MASTER_TIMEOUT / 2; + private static final long REMOVE_DEVICE_FROM_DS_TIMEOUT = 5000L; + private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType"; + + private final Map contextChainMap = Collections.synchronizedMap(new HashMap<>()); + private final EntityOwnershipListenerRegistration eosListenerRegistration; + private final ClusterSingletonServiceProvider singletonServiceProvider; + private final ItemScheduler scheduler; + private final ExecutorService executorService; + private final OwnershipChangeListener ownershipChangeListener; private DeviceManager deviceManager; private RpcManager rpcManager; private StatisticsManager statisticsManager; - private ConcurrentHashMap contextChainMap = new ConcurrentHashMap<>(); - private final ContextChainConfig config; - private ClusterSingletonServiceProvider singletonServicesProvider; - public ContextChainHolderImpl(final ContextChainConfig config) { - this.config = config; + public ContextChainHolderImpl(final HashedWheelTimer timer, + final ExecutorService executorService, + final ClusterSingletonServiceProvider singletonServiceProvider, + final EntityOwnershipService entityOwnershipService, + final OwnershipChangeListener ownershipChangeListener) { + this.singletonServiceProvider = singletonServiceProvider; + this.executorService = executorService; + this.ownershipChangeListener = ownershipChangeListener; + this.ownershipChangeListener.setMasterChecker(this); + this.eosListenerRegistration = Objects.requireNonNull(entityOwnershipService + .registerListener(ASYNC_SERVICE_ENTITY_TYPE, this)); + + this.scheduler = new ItemScheduler<>( + timer, + CHECK_ROLE_MASTER_TIMEOUT, + CHECK_ROLE_MASTER_TOLERANCE, + ContextChain::makeDeviceSlave); } @Override public void addManager(final T manager) { if (Objects.isNull(deviceManager) && manager instanceof DeviceManager) { - LOG.info("Device" + MANAGER_WAS_SET); + LOG.trace("Context chain holder: Device manager OK."); deviceManager = (DeviceManager) manager; } else if (Objects.isNull(rpcManager) && manager instanceof RpcManager) { - LOG.info("RPC" + MANAGER_WAS_SET); + LOG.trace("Context chain holder: RPC manager OK."); rpcManager = (RpcManager) manager; } else if (Objects.isNull(statisticsManager) && manager instanceof StatisticsManager) { - LOG.info("Statistics" + MANAGER_WAS_SET); + LOG.trace("Context chain holder: Statistics manager OK."); statisticsManager = (StatisticsManager) manager; } } - @Override - public ContextChain createContextChain(final ConnectionContext connectionContext) { - + @VisibleForTesting + void createContextChain(final ConnectionContext connectionContext) { final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - final String deviceInfoLOGValue = deviceInfo.getLOGValue(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Creating a new chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue); - } - - final ContextChain contextChain = new ContextChainImpl(); - final LifecycleService lifecycleService = new LifecycleServiceImpl(this); - - if (LOG.isDebugEnabled()) { - LOG.debug("Lifecycle services" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue); - } final DeviceContext deviceContext = deviceManager.createContext(connectionContext); - - if (LOG.isDebugEnabled()) { - LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue); - } - - final RpcContext rpcContext = rpcManager.createContext(connectionContext.getDeviceInfo(), deviceContext); - - if (LOG.isDebugEnabled()) { - LOG.debug("RPC" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue); - } - - final StatisticsContext statisticsContext - = statisticsManager.createContext(deviceContext); - - if (LOG.isDebugEnabled()) { - LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfoLOGValue); - } - - deviceContext.setLifecycleInitializationPhaseHandler(statisticsContext); - statisticsContext.setLifecycleInitializationPhaseHandler(rpcContext); - statisticsContext.setInitialSubmitHandler(deviceContext); - - contextChain.addLifecycleService(lifecycleService); + deviceContext.registerMastershipWatcher(this); + LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); + + final RpcContext rpcContext = rpcManager.createContext(deviceContext); + rpcContext.registerMastershipWatcher(this); + LOG.debug("RPC" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); + + final StatisticsContext statisticsContext = statisticsManager.createContext(deviceContext); + statisticsContext.registerMastershipWatcher(this); + LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); + + final ContextChain contextChain = new ContextChainImpl(this, connectionContext, + executorService); + contextChain.registerDeviceRemovedHandler(deviceManager); + contextChain.registerDeviceRemovedHandler(rpcManager); + contextChain.registerDeviceRemovedHandler(statisticsManager); + contextChain.registerDeviceRemovedHandler(this); contextChain.addContext(deviceContext); contextChain.addContext(rpcContext); contextChain.addContext(statisticsContext); - contextChain.makeDeviceSlave(); + contextChainMap.put(deviceInfo, contextChain); + LOG.debug("Context chain" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); - return contextChain; + deviceContext.onPublished(); + scheduler.add(deviceInfo, contextChain); + scheduler.startIfNotRunning(); + LOG.info("Started timer for setting SLAVE role on node {} if no role will be set in {}s.", + deviceInfo, + CHECK_ROLE_MASTER_TIMEOUT / 1000L); - } - - @Override - public void destroyContextChain(final DeviceInfo deviceInfo) { - ContextChain chain = contextChainMap.get(deviceInfo); - if (Objects.nonNull(chain)) { - chain.close(); - try { - deviceManager.removeDeviceFromOperationalDS(deviceInfo).checkedGet(5L, TimeUnit.SECONDS); - } catch (TimeoutException | TransactionCommitFailedException e) { - LOG.warn("Not able to remove device {} from DS", deviceInfo.getLOGValue()); - } - } - } - - @Override - public void pairConnection(final ConnectionContext connectionContext) { - DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - ContextChain contextChain = contextChainMap.get(deviceInfo); - if (Objects.nonNull(contextChain)) { - contextChain.changePrimaryConnection(connectionContext); - contextChain.makeDeviceSlave(); - } + contextChain.registerServices(singletonServiceProvider); } @Override public ConnectionStatus deviceConnected(final ConnectionContext connectionContext) throws Exception { + final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); + final ContextChain contextChain = contextChainMap.get(deviceInfo); + LOG.info("Device {} connected.", deviceInfo); - Verify.verify(this.checkAllManagers(), NOT_ALL_MANAGER_WERE_SET); - Verify.verifyNotNull(this.singletonServicesProvider, SINGLETON_SERVICE_PROVIDER_WAS_NOT_SET_YET); + if (Objects.nonNull(contextChain)) { + if (contextChain.isClosing()) { + LOG.warn("Device {} is already in termination state, closing all incoming connections.", deviceInfo); + return ConnectionStatus.CLOSING; + } - LOG.info("Device {} connected.", connectionContext.getDeviceInfo().getLOGValue()); - ContextChain chain = contextChainMap.get(connectionContext.getDeviceInfo()); + if (contextChain.addAuxiliaryConnection(connectionContext)) { + LOG.info("An auxiliary connection was added to device: {}", deviceInfo); + return ConnectionStatus.MAY_CONTINUE; + } - if (Objects.isNull(chain)) { - contextChainMap.put(connectionContext.getDeviceInfo(), createContextChain(connectionContext)); - } else { - this.pairConnection(connectionContext); + LOG.warn("Device {} already connected. Closing all connection to the device.", deviceInfo); + destroyContextChain(deviceInfo); + return ConnectionStatus.ALREADY_CONNECTED; } + LOG.debug("No context chain found for device: {}, creating new.", deviceInfo); + createContextChain(connectionContext); return ConnectionStatus.MAY_CONTINUE; } @Override - public void addSingletonServicesProvider(final ClusterSingletonServiceProvider singletonServicesProvider) { - this.singletonServicesProvider = singletonServicesProvider; - } + public void onNotAbleToStartMastership(@Nonnull final DeviceInfo deviceInfo, + @Nonnull final String reason, + final boolean mandatory) { + LOG.warn("Not able to set MASTER role on device {}, reason: {}", deviceInfo, reason); - @Override - public void onNotAbleToStartMastership(final DeviceInfo deviceInfo) { - ContextChain contextChain = contextChainMap.get(deviceInfo); - if (Objects.nonNull(contextChain)) { - LOG.warn("Not able to set MASTER role on device {}", deviceInfo.getLOGValue()); - if (contextChain.getContextChainState().equals(ContextChainState.INITIALIZED)) { - contextChain.closePrimaryConnection(); - } else { - contextChain.sleepTheChainAndDropConnection(); - } + if (!mandatory) { + return; } + + Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { + LOG.warn("This mastering is mandatory, destroying context chain and closing connection for device {}.", deviceInfo); + destroyContextChain(deviceInfo); + }); } @Override - public void onMasterRoleAcquired(final DeviceInfo deviceInfo) { - ContextChain contextChain = contextChainMap.get(deviceInfo); - if (Objects.nonNull(contextChain)) { - if (contextChain.getContextChainState().equals(ContextChainState.WORKINGMASTER)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Device {} already working as MASTER no changes need to be done.", - deviceInfo.getLOGValue()); - } - } else { - if (contextChain.getContextChainState().equals(ContextChainState.INITIALIZED)) { - LOG.info("Device {} has not finish initial gathering yet.", - deviceInfo.getLOGValue()); + public void onMasterRoleAcquired(@Nonnull final DeviceInfo deviceInfo, + @Nonnull final ContextChainMastershipState mastershipState) { + scheduler.remove(deviceInfo); + Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { + if (ownershipChangeListener.isReconciliationFrameworkRegistered()) { + if (mastershipState == ContextChainMastershipState.INITIAL_SUBMIT) { + LOG.error("Initial submit is not allowed here if using reconciliation framework."); + } else { + contextChain.isMastered(mastershipState); + if (contextChain.isPrepared()) { + Futures.addCallback( + ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo), + reconciliationFrameworkCallback(deviceInfo, contextChain), + MoreExecutors.directExecutor()); + } } - Futures.addCallback(contextChain.startChain(), - new StartStopChainCallback(contextChain.provideDeviceContext(), false)); + } else if (contextChain.isMastered(mastershipState)) { + LOG.info("Role MASTER was granted to device {}", deviceInfo); + ownershipChangeListener.becomeMaster(deviceInfo); + deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier()); } - } + }); } @Override public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) { - ContextChain contextChain = contextChainMap.get(deviceInfo); - if (Objects.nonNull(contextChain)) { - contextChain.registerServices(this.singletonServicesProvider); - } + scheduler.remove(deviceInfo); + ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo); + Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(ContextChain::makeContextChainStateSlave); } @Override public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo) { - ContextChain contextChain = contextChainMap.get(deviceInfo); - if (Objects.nonNull(contextChain)) { - contextChain.sleepTheChainAndDropConnection(); - } + Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> destroyContextChain(deviceInfo)); } @Override public void onDeviceDisconnected(final ConnectionContext connectionContext) { + final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - if (Objects.isNull(connectionContext.getDeviceInfo())) { - LOG.info("Non existing device info. Cannot close context chain."); - } else { - LOG.info("Device {} disconnected.", connectionContext.getDeviceInfo().getLOGValue()); - ContextChain chain = contextChainMap.get(connectionContext.getDeviceInfo()); - if (Objects.isNull(chain)) { - if (LOG.isDebugEnabled()) { - LOG.debug("There was no context chain created yet for the disconnected device {}", - connectionContext.getDeviceInfo().getLOGValue()); - } + Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { + if (contextChain.auxiliaryConnectionDropped(connectionContext)) { + LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo); } else { - Futures.addCallback(chain.connectionDropped(), - new StartStopChainCallback(null, true)); + LOG.info("Device {} disconnected.", deviceInfo); + destroyContextChain(deviceInfo); } - } + }); } - private boolean checkAllManagers() { + @VisibleForTesting + boolean checkAllManagers() { return Objects.nonNull(deviceManager) && Objects.nonNull(rpcManager) && Objects.nonNull(statisticsManager); } - private class StartStopChainCallback implements FutureCallback { + @Override + public void close() throws Exception { + scheduler.close(); + Map copyOfChains = new HashMap<>(contextChainMap); + copyOfChains.keySet().forEach(this::destroyContextChain); + copyOfChains.clear(); + contextChainMap.clear(); + eosListenerRegistration.close(); + } - private final String deviceInfo; - private final String stop; - private final String stopped; + @Override + public void ownershipChanged(EntityOwnershipChange entityOwnershipChange) { + if (entityOwnershipChange.hasOwner()) { + return; + } - StartStopChainCallback(final DeviceContext deviceContext, final boolean stop) { + final String entityName = getEntityNameFromOwnershipChange(entityOwnershipChange); - this.deviceInfo = Objects.nonNull(deviceContext) ? deviceContext.getDeviceInfo().getLOGValue() : "null"; - this.stop = stop ? "stop" : "start"; - this.stopped = stop ? "stopped" : "started"; - } + if (Objects.nonNull(entityName)) { + LOG.debug("Entity {} has no owner", entityName); + final NodeId nodeId = new NodeId(entityName); - @Override - public void onSuccess(@Nullable Void nothing) { - LOG.info("Context chain for device {} successfully {}", deviceInfo, stopped); + try { + final KeyedInstanceIdentifier nodeInstanceIdentifier = + DeviceStateUtil.createNodeInstanceIdentifier(nodeId); + + deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier); + + LOG.info("Try to remove device {} from operational DS", nodeId); + deviceManager + .removeDeviceFromOperationalDS(nodeInstanceIdentifier) + .get(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS); + LOG.info("Removing device from operational DS {} was successful", nodeId); + } catch (TimeoutException | ExecutionException | NullPointerException | InterruptedException e) { + LOG.warn("Not able to remove device {} from operational DS. ",nodeId, e); + } } + } - @Override - public void onFailure(@Nonnull final Throwable throwable) { - LOG.warn("Not able to {} the context chain for device {}", stop, deviceInfo); - } + private synchronized void destroyContextChain(final DeviceInfo deviceInfo) { + scheduler.remove(deviceInfo); + + Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { + deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier()); + contextChain.close(); + }); + } + + @Override + public List listOfMasteredDevices() { + return contextChainMap + .entrySet() + .stream() + .filter(deviceInfoContextChainEntry -> deviceInfoContextChainEntry + .getValue() + .isMastered(ContextChainMastershipState.CHECK)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); } + @Override + public boolean isAnyDeviceMastered() { + return contextChainMap + .entrySet() + .stream() + .findAny() + .filter(deviceInfoContextChainEntry -> deviceInfoContextChainEntry.getValue() + .isMastered(ContextChainMastershipState.CHECK)) + .isPresent(); + } + + private String getEntityNameFromOwnershipChange(final EntityOwnershipChange entityOwnershipChange) { + final YangInstanceIdentifier.NodeIdentifierWithPredicates lastIdArgument = + (YangInstanceIdentifier.NodeIdentifierWithPredicates) entityOwnershipChange + .getEntity() + .getId() + .getLastPathArgument(); + + return lastIdArgument + .getKeyValues() + .values() + .iterator() + .next() + .toString(); + } + + @Override + public void onDeviceRemoved(final DeviceInfo deviceInfo) { + scheduler.remove(deviceInfo); + contextChainMap.remove(deviceInfo); + LOG.debug("Context chain removed for node {}", deviceInfo); + } + + private FutureCallback reconciliationFrameworkCallback( + @Nonnull DeviceInfo deviceInfo, + ContextChain contextChain) { + return new FutureCallback() { + @Override + public void onSuccess(@Nullable ResultState result) { + if (ResultState.DONOTHING == result) { + LOG.info("Device {} connection is enabled by reconciliation framework.", deviceInfo); + if (!contextChain.continueInitializationAfterReconciliation()) { + LOG.warn("Initialization submit after reconciliation failed for device {}", deviceInfo); + destroyContextChain(deviceInfo); + } else { + ownershipChangeListener.becomeMaster(deviceInfo); + deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier()); + } + } else { + LOG.warn("Reconciliation framework failure for device {}", deviceInfo); + destroyContextChain(deviceInfo); + } + } + + @Override + public void onFailure(@Nonnull Throwable t) { + LOG.warn("Reconciliation framework failure."); + destroyContextChain(deviceInfo); + } + }; + } }