X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Flifecycle%2FContextChainHolderImpl.java;h=2e52dc214ec823b4a1f45fccf39c77fc018a3e8e;hb=90299c25f28265e0327898b60029a8088a4b0aad;hp=cef12a8dc3f68e759a41cec92bacab571e843703;hpb=07c5428b2f06d43037de8f306b9061fb4ee855fa;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java index cef12a8dc3..2e52dc214e 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java @@ -8,16 +8,19 @@ package org.opendaylight.openflowplugin.impl.lifecycle; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Verify; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import io.netty.util.internal.ConcurrentSet; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -36,6 +39,7 @@ import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain; import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder; +import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState; import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext; import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager; @@ -43,7 +47,6 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsManager; import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil; import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId; -import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.ContextChainState; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,55 +54,41 @@ import org.slf4j.LoggerFactory; public class ContextChainHolderImpl implements ContextChainHolder { private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class); - private static final String NOT_ALL_MANAGER_WERE_SET = "Not all manager were set."; - private static final String MANAGER_WAS_SET = " manager was set"; + private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}"; - private static final String SINGLETON_SERVICE_PROVIDER_WAS_NOT_SET_YET - = "Singleton service provider was not set yet."; - private static final long DEFAULT_TTL_STEP = 1000L; - private static final long DEFAULT_TTL_BEFORE_DROP = 1000L; - private static final long DEFAULT_CHECK_ROLE_MASTER = 5000L; - private static final boolean STOP = true; - private static final boolean START = false; + private static final long DEFAULT_CHECK_ROLE_MASTER = 10000L; private static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType"; + private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType"; + + private final ConcurrentHashMap contextChainMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap withoutRoleChains = new ConcurrentHashMap<>(); + private final List markToBeRemoved = new ArrayList<>(); + private final HashedWheelTimer timer; + private final Long checkRoleMaster; private DeviceManager deviceManager; private RpcManager rpcManager; private StatisticsManager statisticsManager; private EntityOwnershipListenerRegistration eosListenerRegistration; - private volatile ConcurrentHashMap contextChainMap = new ConcurrentHashMap<>(); - private final ConcurrentHashMap sleepingChains = new ConcurrentHashMap<>(); - private final ConcurrentHashMap withoutRoleChains = new ConcurrentHashMap<>(); - private final ConcurrentHashMap timeToLive = new ConcurrentHashMap<>(); - private final List markToBeRemoved = new ArrayList<>(); private ClusterSingletonServiceProvider singletonServicesProvider; - private boolean timerIsRunning; - private final HashedWheelTimer timer; - private Long ttlBeforeDrop; - private Long ttlStep; - private Long checkRoleMaster; - private Boolean neverDropChain; private boolean timerIsRunningRole; public ContextChainHolderImpl(final HashedWheelTimer timer) { - this.timerIsRunning = START; - this.timerIsRunningRole = START; + this.timerIsRunningRole = false; this.timer = timer; - this.ttlBeforeDrop = DEFAULT_TTL_BEFORE_DROP; - this.ttlStep = DEFAULT_TTL_STEP; this.checkRoleMaster = DEFAULT_CHECK_ROLE_MASTER; } @Override public void addManager(final T manager) { if (Objects.isNull(deviceManager) && manager instanceof DeviceManager) { - LOG.info("Device" + MANAGER_WAS_SET); + LOG.debug("Device manager was set."); deviceManager = (DeviceManager) manager; } else if (Objects.isNull(rpcManager) && manager instanceof RpcManager) { - LOG.info("RPC" + MANAGER_WAS_SET); + LOG.debug("RPC manager was set."); rpcManager = (RpcManager) manager; } else if (Objects.isNull(statisticsManager) && manager instanceof StatisticsManager) { - LOG.info("Statistics" + MANAGER_WAS_SET); + LOG.debug("Statistics manager was set."); statisticsManager = (StatisticsManager) manager; } } @@ -155,65 +144,47 @@ public class ContextChainHolderImpl implements ContextChainHolder { if (!this.timerIsRunningRole) { this.startTimerRole(); } - deviceContext.onPublished(); - contextChain.registerServices(this.singletonServicesProvider); return contextChain; - } @Override - public void destroyContextChain(final DeviceInfo deviceInfo) { - removeFromSleepingChainsMap(deviceInfo); - ContextChain chain = contextChainMap.get(deviceInfo); - if (Objects.nonNull(chain)) { + public ListenableFuture destroyContextChain(final DeviceInfo deviceInfo) { + ContextChain chain = contextChainMap.remove(deviceInfo); + if (chain != null) { chain.close(); - contextChainMap.remove(deviceInfo); - if (markToBeRemoved.contains(deviceInfo)) { - markToBeRemoved.remove(deviceInfo); - try { - LOG.info("Removing device: {} from DS", deviceInfo.getLOGValue()); - deviceManager.removeDeviceFromOperationalDS(deviceInfo).checkedGet(5L, TimeUnit.SECONDS); - } catch (TimeoutException | TransactionCommitFailedException e) { - LOG.warn("Not able to remove device {} from DS", deviceInfo.getLOGValue()); - } - } } - } - - @Override - public void pairConnection(final ConnectionContext connectionContext) { - DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - ContextChain contextChain = contextChainMap.get(deviceInfo); - if (Objects.nonNull(contextChain)) { - contextChain.changePrimaryConnection(connectionContext); - contextChain.makeDeviceSlave(); + if (markToBeRemoved.contains(deviceInfo)) { + markToBeRemoved.remove(deviceInfo); + LOG.info("Removing device: {} from DS", deviceInfo.getLOGValue()); + return deviceManager.removeDeviceFromOperationalDS(deviceInfo); + } else { + return Futures.immediateFuture(null); } } @Override public ConnectionStatus deviceConnected(final ConnectionContext connectionContext) throws Exception { - Verify.verify(this.checkAllManagers(), NOT_ALL_MANAGER_WERE_SET); - Verify.verifyNotNull(this.singletonServicesProvider, SINGLETON_SERVICE_PROVIDER_WAS_NOT_SET_YET); - DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - LOG.info("Device {} connected.", deviceInfo.getLOGValue()); - ContextChain chain = contextChainMap.get(deviceInfo); - - if (Objects.isNull(chain)) { - if (LOG.isDebugEnabled()) { - LOG.debug("No context chain found for device: {}, creating new.", deviceInfo.getLOGValue()); + ContextChain contextChain = contextChainMap.get(deviceInfo); + if (contextChain != null) { + if (contextChain.addAuxiliaryConnection(connectionContext)) { + LOG.info("An auxiliary connection was added to device: {}", deviceInfo.getLOGValue()); + return ConnectionStatus.MAY_CONTINUE; + } else { + LOG.warn("Device {} already connected. Closing all connection to the device.", deviceInfo.getLOGValue()); + destroyContextChain(deviceInfo); + return ConnectionStatus.ALREADY_CONNECTED; } - contextChainMap.put(deviceInfo, createContextChain(connectionContext)); } else { if (LOG.isDebugEnabled()) { - LOG.debug("Found context chain for device: {}, pairing.", deviceInfo.getLOGValue()); + LOG.debug("No context chain found for device: {}, creating new.", deviceInfo.getLOGValue()); } - this.pairConnection(connectionContext); + contextChainMap.put(deviceInfo, createContextChain(connectionContext)); } return ConnectionStatus.MAY_CONTINUE; @@ -225,57 +196,42 @@ public class ContextChainHolderImpl implements ContextChainHolder { } @Override - public void onNotAbleToStartMastership(final DeviceInfo deviceInfo) { - ContextChain contextChain = contextChainMap.get(deviceInfo); - if (Objects.nonNull(contextChain)) { - LOG.warn("Not able to set MASTER role on device {}", deviceInfo.getLOGValue()); - if (contextChain.getContextChainState().equals(ContextChainState.INITIALIZED)) { - contextChain.closePrimaryConnection(); - } else { - contextChain.sleepTheChainAndDropConnection(); - addToSleepingChainsMap(deviceInfo, contextChain); - } + public void onNotAbleToStartMastership(final DeviceInfo deviceInfo, @Nonnull final String reason) { + this.withoutRoleChains.remove(deviceInfo); + LOG.warn("Not able to set MASTER role on device {}, reason: {}", deviceInfo.getLOGValue(), reason); + if (contextChainMap.containsKey(deviceInfo)) { + destroyContextChain(deviceInfo); } } @Override - public void onMasterRoleAcquired(final DeviceInfo deviceInfo) { + public void onMasterRoleAcquired(final DeviceInfo deviceInfo, + @Nonnull final ContextChainMastershipState mastershipState) { this.withoutRoleChains.remove(deviceInfo); ContextChain contextChain = contextChainMap.get(deviceInfo); - if (Objects.nonNull(contextChain)) { - if (contextChain.getContextChainState().equals(ContextChainState.WORKINGMASTER)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Device {} already working as MASTER no changes need to be done.", - deviceInfo.getLOGValue()); - } - } else { - if (contextChain.getContextChainState().equals(ContextChainState.INITIALIZED)) { - LOG.info("Device {} has not finish initial gathering yet.", - deviceInfo.getLOGValue()); - } - Futures.addCallback(contextChain.startChain(), - new StartStopChainCallback(deviceInfo, START)); + if (contextChain != null) { + if (contextChain.isMastered(mastershipState)) { + LOG.info("Role MASTER was granted to device {}", deviceInfo.getLOGValue()); + this.sendNotificationNodeAdded(deviceInfo); } } } @Override public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) { - if (!this.withoutRoleChains.isEmpty()) { - this.withoutRoleChains.remove(deviceInfo); - } + this.withoutRoleChains.remove(deviceInfo); ContextChain contextChain = contextChainMap.get(deviceInfo); - if (Objects.nonNull(contextChain)) { + if (contextChain != null) { contextChain.makeContextChainStateSlave(); } } @Override public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo) { + this.withoutRoleChains.remove(deviceInfo); ContextChain contextChain = contextChainMap.get(deviceInfo); - if (Objects.nonNull(contextChain)) { - contextChain.sleepTheChainAndDropConnection(); - addToSleepingChainsMap(deviceInfo, contextChain); + if (contextChain != null) { + destroyContextChain(deviceInfo); } } @@ -283,148 +239,40 @@ public class ContextChainHolderImpl implements ContextChainHolder { public void onDeviceDisconnected(final ConnectionContext connectionContext) { final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - - if (Objects.isNull(deviceInfo)) { - LOG.info("Non existing device info. Cannot close context chain."); - } else { - LOG.info("Device {} disconnected.", deviceInfo.getLOGValue()); + if (deviceInfo != null) { ContextChain chain = contextChainMap.get(deviceInfo); - if (Objects.isNull(chain)) { - if (LOG.isDebugEnabled()) { - LOG.debug("There was no context chain created yet for the disconnected device {}", - deviceInfo.getLOGValue()); + if (chain != null) { + if (chain.auxiliaryConnectionDropped(connectionContext)) { + LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo.getLOGValue()); + } else { + LOG.info("Device {} disconnected.", deviceInfo.getLOGValue()); + Futures.transform(chain.connectionDropped(), new Function() { + @Nullable + @Override + public Object apply(@Nullable Void aVoid) { + destroyContextChain(deviceInfo); + return null; + } + }); } - } else { - Futures.addCallback(chain.connectionDropped(), - new StartStopChainCallback(deviceInfo, STOP)); } } } - @Override - public void setTtlBeforeDrop(final Long ttlBeforeDrop) { - this.ttlBeforeDrop = ttlBeforeDrop; - } - - @Override - public void setTtlStep(final Long ttlStep) { - this.ttlStep = ttlStep; - } - - @Override - public void setNeverDropContextChain(final Boolean neverDropChain) { - this.neverDropChain = neverDropChain; - } - @Override public void changeEntityOwnershipService(final EntityOwnershipService entityOwnershipService) { if (Objects.nonNull(this.eosListenerRegistration)) { LOG.warn("EOS Listener already registered."); } else { this.eosListenerRegistration = Verify.verifyNotNull(entityOwnershipService.registerListener - (SERVICE_ENTITY_TYPE, this)); - } - } - - private void addToSleepingChainsMap(@Nonnull final DeviceInfo deviceInfo, final ContextChain contextChain) { - if (contextChain.lastStateWasMaster()) { - destroyContextChain(deviceInfo); - } else { - sleepingChains.put(deviceInfo, contextChain); - if (LOG.isDebugEnabled()) { - LOG.debug("Put context chain on mattress to sleep for device {}", deviceInfo.getLOGValue()); - } - if (!this.neverDropChain) { - timeToLive.put(deviceInfo, this.ttlBeforeDrop); - if (!this.timerIsRunning) { - startTimer(); - } - } - } - } - - private void removeFromSleepingChainsMap(@Nonnull final DeviceInfo deviceInfo) { - sleepingChains.remove(deviceInfo); - if (!this.neverDropChain) { - timeToLive.remove(deviceInfo); - if (sleepingChains.isEmpty() && this.timerIsRunning) { - stopTimer(); - } - } - } - - private void startTimer() { - this.timerIsRunning = true; - if (LOG.isDebugEnabled()) { - LOG.debug("There is at least one context chains sleeping, starting timer."); - } - timer.newTimeout(new SleepingChainsTimerTask(), this.ttlStep, TimeUnit.MILLISECONDS); - } - - private void stopTimer() { - this.timerIsRunning = false; - if (LOG.isDebugEnabled()) { - LOG.debug("There are no context chains sleeping, stopping timer."); - } - } - - private void timerTick() { - if (sleepingChains.isEmpty()) { - this.stopTimer(); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Context chain holder timer tick. There are {} context chains sleeping.", - sleepingChains.size()); - } - if (timeToLive.isEmpty()) { - LOG.warn("TTL map is empty but not sleeping chains map. Providing clean up."); - sleepingChains.clear(); - } - final ArrayList deviceInfos = new ArrayList<>(); - for (Map.Entry deviceInfoLongEntry : timeToLive.entrySet()) { - Long newValue = deviceInfoLongEntry.getValue() - this.ttlStep; - deviceInfoLongEntry.setValue(newValue); - DeviceInfo deviceInfo = deviceInfoLongEntry.getKey(); - ContextChain chain = sleepingChains.get(deviceInfo); - if (Objects.isNull(chain)) { - if (LOG.isDebugEnabled()) { - LOG.debug("There is no sleeping context chain for device {}", deviceInfo.getLOGValue()); - } - deviceInfos.add(deviceInfo); - continue; - } - if (!ContextChainState.SLEEPING.equals(chain.getContextChainState())) { - if (LOG.isDebugEnabled()) { - LOG.debug("There is timer registered for device: {} " - + "but device is in state: {} Removing from timer.", - deviceInfo.getLOGValue(), - chain.getContextChainState().getName()); - } - deviceInfos.add(deviceInfo); - } - if (newValue <= 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Dear device: {} your time to wake up is up. Its time to destroy you.", - deviceInfo.getLOGValue()); - } - destroyContextChain(deviceInfo); - } - } - - deviceInfos.forEach(timeToLive::remove); - - if (!timeToLive.isEmpty()) { - timer.newTimeout(new SleepingChainsTimerTask(), this.ttlStep, TimeUnit.MILLISECONDS); - } else { - this.stopTimer(); - } + (ASYNC_SERVICE_ENTITY_TYPE, this)); } } private void startTimerRole() { this.timerIsRunningRole = true; if (LOG.isDebugEnabled()) { - LOG.debug("There is {} context chains without role, starting timer.", this.withoutRoleChains.size()); + LOG.debug("There is a context chain without role, starting timer."); } timer.newTimeout(new RoleTimerTask(), this.checkRoleMaster, TimeUnit.MILLISECONDS); } @@ -432,20 +280,31 @@ public class ContextChainHolderImpl implements ContextChainHolder { private void stopTimerRole() { this.timerIsRunningRole = false; if (LOG.isDebugEnabled()) { - LOG.debug("There are no context chains without role, stopping timer."); + LOG.debug("There are no context chains, stopping timer."); } } private void timerTickRole() { - if (withoutRoleChains.isEmpty()) { - this.stopTimerRole(); - } else { + if (!withoutRoleChains.isEmpty()) { this.withoutRoleChains.forEach((deviceInfo, contextChain) -> contextChain.makeDeviceSlave()); - if (LOG.isDebugEnabled()) { - LOG.debug("There is still {} context chains without role, re-starting timer.", - this.withoutRoleChains.size()); - } timer.newTimeout(new RoleTimerTask(), this.checkRoleMaster, TimeUnit.MILLISECONDS); + } else { + final Set setOfClosedChains = new ConcurrentSet<>(); + if (!this.contextChainMap.isEmpty()) { + this.contextChainMap.forEach((deviceInfo, contextChain) -> { + if (!contextChain.hasState()) { + LOG.warn("Context chain {} is long time without state. Closing.", deviceInfo); + setOfClosedChains.add(deviceInfo); + contextChain.close(); + } + }); + setOfClosedChains.forEach(this.contextChainMap::remove); + } + if (this.contextChainMap.isEmpty()) { + this.stopTimerRole(); + } else { + timer.newTimeout(new RoleTimerTask(), this.checkRoleMaster, TimeUnit.MILLISECONDS); + } } } @@ -456,21 +315,26 @@ public class ContextChainHolderImpl implements ContextChainHolder { @Override public void close() throws Exception { + this.contextChainMap.forEach((deviceInfo, contextChain) -> { + if (contextChain.isMastered(ContextChainMastershipState.CHECK)) { + contextChain.stopChain(true); + } + contextChain.close(); + }); if (Objects.nonNull(eosListenerRegistration)) { - LOG.info("Closing entity ownership listener."); eosListenerRegistration.close(); } } @Override public void ownershipChanged(EntityOwnershipChange entityOwnershipChange) { - if (!entityOwnershipChange.hasOwner() && !entityOwnershipChange.isOwner() && entityOwnershipChange.wasOwner()) { + if (!entityOwnershipChange.hasOwner()) { final YangInstanceIdentifier yii = entityOwnershipChange.getEntity().getId(); final YangInstanceIdentifier.NodeIdentifierWithPredicates niiwp = (YangInstanceIdentifier.NodeIdentifierWithPredicates) yii.getLastPathArgument(); String entityName = niiwp.getKeyValues().values().iterator().next().toString(); if (LOG.isDebugEnabled()) { - LOG.debug("Last master for entity : {}", entityName); + LOG.debug("Entity {} has no owner", entityName); } if (entityName != null ){ @@ -480,7 +344,7 @@ public class ContextChainHolderImpl implements ContextChainHolder { if (entry.getKey().getNodeId().equals(nodeId)) { inMap = entry.getKey(); break; - } + } } if (Objects.nonNull(inMap)) { markToBeRemoved.add(inMap); @@ -491,56 +355,16 @@ public class ContextChainHolderImpl implements ContextChainHolder { .removeDeviceFromOperationalDS(DeviceStateUtil.createNodeInstanceIdentifier(nodeId)) .checkedGet(5L, TimeUnit.SECONDS); } catch (TimeoutException | TransactionCommitFailedException e) { - LOG.warn("Not able to remove device {} from DS", nodeId); + LOG.info("Not able to remove device {} from DS. Probably removed by another cluster node.", + nodeId); } } - } - } - } - - private class StartStopChainCallback implements FutureCallback { - - private final String deviceInfoString; - private final String stopString; - private final String stoppedString; - private final boolean stop; - private final DeviceInfo deviceInfo; - - StartStopChainCallback( - final DeviceInfo deviceInfo, - final boolean stop) { - - this.deviceInfoString = Objects.nonNull(deviceInfo) ? deviceInfo.getLOGValue() : "null"; - this.stopString = stop ? "stop" : "start"; - this.stoppedString = stop ? "stopped" : "started"; - this.stop = stop; - this.deviceInfo = deviceInfo; - } - - @Override - public void onSuccess(@Nullable Void nothing) { - LOG.info("Context chain for device {} successfully {}", deviceInfoString, stoppedString); - if (this.stop && Objects.nonNull(deviceInfo)) { - addToSleepingChainsMap(deviceInfo, contextChainMap.get(deviceInfo)); - } - } - - @Override - public void onFailure(@Nonnull final Throwable throwable) { - LOG.warn("Not able to {} the context chain for device {}", stopString, deviceInfoString); - if (this.stop && Objects.nonNull(deviceInfo)) { - addToSleepingChainsMap(deviceInfo, contextChainMap.get(deviceInfo)); } } } - private class SleepingChainsTimerTask implements TimerTask { - - @Override - public void run(Timeout timeout) throws Exception { - timerTick(); - } - + private void sendNotificationNodeAdded(final DeviceInfo deviceInfo) { + this.deviceManager.sendNodeAddedNotification(deviceInfo); } private class RoleTimerTask implements TimerTask {