X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Flifecycle%2FContextChainHolderImpl.java;h=24574fb594ada1b41515a7594aa67afbae360068;hb=refs%2Fchanges%2F77%2F100077%2F17;hp=11e21ce53c4eee7ad99cea0279d8a1e7b49f6f2f;hpb=13e1d5e6c0237b9378d60526dd8c1d79db6d2b49;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java index 11e21ce53c..24574fb594 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/lifecycle/ContextChainHolderImpl.java @@ -12,6 +12,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.HashMap; import java.util.List; @@ -19,15 +20,21 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import javax.annotation.Nonnull; +import org.eclipse.jdt.annotation.NonNull; +import org.eclipse.jdt.annotation.Nullable; import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipChange; import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipListenerRegistration; import org.opendaylight.mdsal.eos.binding.api.EntityOwnershipService; +import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.openflowplugin.api.openflow.OFPManager; import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext; @@ -52,40 +59,53 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.mdsal.core.general.entity.rev150930.Entity; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflow.provider.config.rev160510.OpenflowProviderConfig; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState; import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier; import org.opendaylight.yangtools.yang.common.Uint8; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker { +public final class ContextChainHolderImpl implements ContextChainHolder, MasterChecker { private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class); private static final Logger OF_EVENT_LOG = LoggerFactory.getLogger("OfEventLog"); private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}"; private static final long REMOVE_DEVICE_FROM_DS_TIMEOUT = 5000L; private static final String ASYNC_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType"; + private static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType"; private static final String SEPARATOR = ":"; - private final Map contextChainMap = new ConcurrentHashMap<>(); - private final Map connectingDevices = new ConcurrentHashMap<>(); + private final ConcurrentMap contextChainMap = new ConcurrentHashMap<>(); + private final ConcurrentMap connectingDevices = new ConcurrentHashMap<>(); private final EntityOwnershipListenerRegistration eosListenerRegistration; private final ClusterSingletonServiceProvider singletonServiceProvider; - private final ExecutorService executorService; + private final Executor executor; private final OwnershipChangeListener ownershipChangeListener; + private final ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("node-cleaner-%d").setUncaughtExceptionHandler((thread, throwable) -> { + LOG.warn("Uncaught exception while removing node data from operational datastore.", throwable); + }).build(); + private final ScheduledExecutorService nodeCleanerExecutor = Executors.newScheduledThreadPool( + Runtime.getRuntime().availableProcessors() , threadFactory); + private final EntityOwnershipService entityOwnershipService; + private final OpenflowProviderConfig config; private DeviceManager deviceManager; private RpcManager rpcManager; private StatisticsManager statisticsManager; private RoleManager roleManager; - public ContextChainHolderImpl(final ExecutorService executorService, + public ContextChainHolderImpl(final Executor executor, final ClusterSingletonServiceProvider singletonServiceProvider, final EntityOwnershipService entityOwnershipService, - final OwnershipChangeListener ownershipChangeListener) { + final OwnershipChangeListener ownershipChangeListener, + final OpenflowProviderConfig config) { this.singletonServiceProvider = singletonServiceProvider; - this.executorService = executorService; + this.executor = executor; this.ownershipChangeListener = ownershipChangeListener; this.ownershipChangeListener.setMasterChecker(this); - this.eosListenerRegistration = Objects + this.entityOwnershipService = entityOwnershipService; + this.config = config; + eosListenerRegistration = Objects .requireNonNull(entityOwnershipService.registerListener(ASYNC_SERVICE_ENTITY_TYPE, this)); } @@ -109,7 +129,6 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker @VisibleForTesting void createContextChain(final ConnectionContext connectionContext) { final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - final DeviceContext deviceContext = deviceManager.createContext(connectionContext); deviceContext.registerMastershipWatcher(this); LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); @@ -127,7 +146,7 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker roleContext.registerMastershipWatcher(this); LOG.debug("Role" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo); - final ContextChain contextChain = new ContextChainImpl(this, connectionContext, executorService); + final ContextChain contextChain = new ContextChainImpl(this, connectionContext, executor); contextChain.registerDeviceRemovedHandler(deviceManager); contextChain.registerDeviceRemovedHandler(rpcManager); contextChain.registerDeviceRemovedHandler(statisticsManager); @@ -198,27 +217,24 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker } @Override - public void onNotAbleToStartMastership(@Nonnull final DeviceInfo deviceInfo, @Nonnull final String reason, - final boolean mandatory) { - LOG.warn("Not able to set MASTER role on device {}, reason: {}", deviceInfo, reason); + public void onNotAbleToStartMastership(final DeviceInfo deviceInfo, final String reason, final boolean mandatory) { + LOG.error("Not able to set MASTER role on device {}, reason: {}", deviceInfo, reason); if (!mandatory) { return; } - - Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { + if (contextChainMap.containsKey(deviceInfo)) { LOG.warn("This mastering is mandatory, destroying context chain and closing connection for device {}.", deviceInfo); destroyContextChain(deviceInfo); - }); + } } @Override - public void onMasterRoleAcquired(@Nonnull final DeviceInfo deviceInfo, - @Nonnull final ContextChainMastershipState mastershipState) { - Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { - if (ownershipChangeListener.isReconciliationFrameworkRegistered() - && !ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)) { + public void onMasterRoleAcquired(final DeviceInfo deviceInfo, final ContextChainMastershipState mastershipState) { + final ContextChain contextChain = contextChainMap.get(deviceInfo); + if (contextChain != null) { + if (!ContextChainMastershipState.INITIAL_SUBMIT.equals(mastershipState)) { if (contextChain.isMastered(mastershipState, true)) { Futures.addCallback(ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo), reconciliationFrameworkCallback(deviceInfo, contextChain, mastershipState), @@ -226,37 +242,44 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker } } else if (contextChain.isMastered(mastershipState, false)) { LOG.info("Role MASTER was granted to device {}", deviceInfo); - ownershipChangeListener.becomeMaster(deviceInfo); + OF_EVENT_LOG.debug("Master Elected, Node: {}", deviceInfo.getDatapathId()); deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier()); } - }); + } } @Override public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) { ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo); LOG.info("Role SLAVE was granted to device {}", deviceInfo); - Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(ContextChain::makeContextChainStateSlave); + final ContextChain contextChain = contextChainMap.get(deviceInfo); + if (contextChain != null) { + contextChain.makeContextChainStateSlave(); + } } @Override public void onSlaveRoleNotAcquired(final DeviceInfo deviceInfo, final String reason) { - LOG.warn("Not able to set SLAVE role on device {}, reason: {}", deviceInfo, reason); - Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> destroyContextChain(deviceInfo)); + LOG.error("Not able to set SLAVE role on device {}, reason: {}", deviceInfo, reason); + if (contextChainMap.containsKey(deviceInfo)) { + destroyContextChain(deviceInfo); + } } @Override public void onDeviceDisconnected(final ConnectionContext connectionContext) { final DeviceInfo deviceInfo = connectionContext.getDeviceInfo(); - - Optional.ofNullable(connectionContext.getDeviceInfo()).map(contextChainMap::get).ifPresent(contextChain -> { - if (contextChain.auxiliaryConnectionDropped(connectionContext)) { - LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo); - } else { - LOG.info("Device {} disconnected.", deviceInfo); - destroyContextChain(deviceInfo); + if (deviceInfo != null) { + final ContextChain contextChain = contextChainMap.get(deviceInfo); + if (contextChain != null) { + if (contextChain.auxiliaryConnectionDropped(connectionContext)) { + LOG.info("Auxiliary connection from device {} disconnected.", deviceInfo); + } else { + LOG.info("Device {} disconnected.", deviceInfo); + destroyContextChain(deviceInfo); + } } - }); + } } @VisibleForTesting @@ -275,13 +298,20 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker copyOfChains.keySet().forEach(this::destroyContextChain); copyOfChains.clear(); contextChainMap.clear(); + OF_EVENT_LOG.debug("EOS registration closed for all devices"); eosListenerRegistration.close(); OF_EVENT_LOG.debug("EOS registration closed for all devices"); + nodeCleanerExecutor.shutdownNow(); } @Override @SuppressFBWarnings("BC_UNCONFIRMED_CAST_OF_RETURN_VALUE") public void ownershipChanged(final EntityOwnershipChange entityOwnershipChange) { + LOG.info("Entity ownership change received for node : {}", entityOwnershipChange); + if (entityOwnershipChange.inJeopardy()) { + LOG.warn("Controller is in Jeopardy, ignore ownership change notification. {}", entityOwnershipChange); + return; + } if (entityOwnershipChange.getState().hasOwner()) { return; } @@ -295,45 +325,58 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker .firstKeyOf(Entity.class) .getName(); - if (entityName != null) { - LOG.debug("Entity {} has no owner", entityName); + if (entityName != null && entityName.startsWith("openflow:")) { + if (nodeCleanerExecutor.isShutdown()) { + LOG.warn("Node cleaner executor thread-pool is down."); + return; + } + LOG.debug("Device {} will be removed from datastore in {} msec, if it's not transient notification.", + entityName, config.getDeviceDatastoreRemovalDelay().getValue()); final String dpnId = getDpnIdFromNodeName(entityName); - try { - //TODO:Remove notifications - final KeyedInstanceIdentifier nodeInstanceIdentifier = - DeviceStateUtil.createNodeInstanceIdentifier(new NodeId(entityName)); - deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier); - LOG.info("Try to remove device {} from operational DS", entityName); - ListenableFuture future = deviceManager.removeDeviceFromOperationalDS(nodeInstanceIdentifier); - Futures.addCallback(future, new FutureCallback() { - @Override - public void onSuccess(final Object result) { - LOG.debug("Node removed from Oper DS, Node: {}", dpnId); - OF_EVENT_LOG.debug("Node removed from Oper DS, Node: {}", dpnId); - } - - @Override - public void onFailure(final Throwable throwable) { - LOG.error("Could not remove device {} from operational DS", dpnId, throwable); + nodeCleanerExecutor.schedule(() -> { + try { + EntityOwnershipState ownershipState = getCurrentOwnershipStatus(entityName); + if (ownershipState == null || EntityOwnershipState.NO_OWNER.equals(ownershipState)) { + LOG.debug("Entity {} has no owner", entityName); + final KeyedInstanceIdentifier nodeInstanceIdentifier = + DeviceStateUtil.createNodeInstanceIdentifier(new NodeId(entityName)); + deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier); + LOG.info("Try to remove device {} from operational DS", entityName); + ListenableFuture future = + deviceManager.removeDeviceFromOperationalDS(nodeInstanceIdentifier); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(final Object result) { + LOG.debug("Node removed from Oper DS, Node: {}", dpnId); + OF_EVENT_LOG.debug("Node removed from Oper DS, Node: {}", dpnId); + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.error("Could not remove device {} from operational DS", dpnId, throwable); + } + }, MoreExecutors.directExecutor()); + future.get(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS); + } else { + LOG.warn("Seems like device is still owned by other controller instance. Skip deleting {} " + + "node from operational datastore.", entityName); } - }, MoreExecutors.directExecutor()); - future.get(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS); - } catch (TimeoutException | ExecutionException | NullPointerException | InterruptedException e) { - LOG.warn("Not able to remove device {} from operational DS. ", entityName, e); - } - OF_EVENT_LOG.debug("Node removed, Node: {}", dpnId); - LOG.info("Removing device from operational DS {} was successful", dpnId); + } catch (TimeoutException | ExecutionException | InterruptedException e) { + LOG.warn("Not able to remove device {} from operational DS. ", entityName, e); + } + }, config.getDeviceDatastoreRemovalDelay().getValue().toJava(), TimeUnit.MILLISECONDS); } } private void destroyContextChain(final DeviceInfo deviceInfo) { OF_EVENT_LOG.debug("Destroying context chain for device {}", deviceInfo.getDatapathId()); ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo); - Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> { + final ContextChain contextChain = contextChainMap.get(deviceInfo); + if (contextChain != null) { deviceManager.sendNodeRemovedNotification(deviceInfo.getNodeInstanceIdentifier()); contextChain.close(); connectingDevices.remove(deviceInfo); - }); + } } @Override @@ -357,7 +400,7 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker LOG.debug("Context chain removed for node {}", deviceInfo); } - private FutureCallback reconciliationFrameworkCallback(@Nonnull final DeviceInfo deviceInfo, + private FutureCallback reconciliationFrameworkCallback(@NonNull final DeviceInfo deviceInfo, final ContextChain contextChain, final ContextChainMastershipState mastershipState) { return new FutureCallback<>() { @Override @@ -366,12 +409,13 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker OF_EVENT_LOG.debug("Device {} connection is enabled by reconciliation framework", deviceInfo); LOG.info("Device {} connection is enabled by reconciliation framework.", deviceInfo); if (mastershipState == ContextChainMastershipState.MASTER_ON_DEVICE) { + ownershipChangeListener.becomeMaster(deviceInfo); contextChain.initializeDevice(); } contextChain.continueInitializationAfterReconciliation(); } else { OF_EVENT_LOG.debug("Reconciliation framework failure for device {}", deviceInfo); - LOG.warn("Reconciliation framework failure for device {}", deviceInfo); + LOG.warn("Reconciliation framework failure for device {} with resultState {}", deviceInfo, result); destroyContextChain(deviceInfo); } } @@ -380,13 +424,31 @@ public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker public void onFailure(final Throwable throwable) { OF_EVENT_LOG.debug("Reconciliation framework failure for device {} with error {}", deviceInfo, throwable.getMessage()); - LOG.warn("Reconciliation framework failure."); + LOG.warn("Reconciliation framework failure for device {}", deviceInfo, throwable); destroyContextChain(deviceInfo); } }; } - private String getDpnIdFromNodeName(final String nodeName) { + private static String getDpnIdFromNodeName(final String nodeName) { return nodeName.substring(nodeName.lastIndexOf(SEPARATOR) + 1); } + + private @Nullable EntityOwnershipState getCurrentOwnershipStatus(final String nodeId) { + org.opendaylight.mdsal.eos.binding.api.Entity entity = createNodeEntity(nodeId); + Optional ownershipStatus + = entityOwnershipService.getOwnershipState(entity); + + if (ownershipStatus.isPresent()) { + LOG.debug("Current ownership status for node {} is {}", nodeId, ownershipStatus.get()); + return ownershipStatus.get(); + } + + LOG.trace("Ownership status is not available for node {}", nodeId); + return null; + } + + private static org.opendaylight.mdsal.eos.binding.api.Entity createNodeEntity(final String nodeId) { + return new org.opendaylight.mdsal.eos.binding.api.Entity(ASYNC_SERVICE_ENTITY_TYPE, nodeId); + } }