package org.opendaylight.openflowplugin.impl.lifecycle;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.HashedWheelTimer;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChain;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainHolder;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainMastershipState;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.MasterChecker;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.OwnershipChangeListener;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
import org.opendaylight.yangtools.yang.binding.KeyedInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ContextChainHolderImpl implements ContextChainHolder {
+public class ContextChainHolderImpl implements ContextChainHolder, MasterChecker {
private static final Logger LOG = LoggerFactory.getLogger(ContextChainHolderImpl.class);
private static final String CONTEXT_CREATED_FOR_CONNECTION = " context created for connection: {}";
private final Map<DeviceInfo, ContextChain> contextChainMap = Collections.synchronizedMap(new HashMap<>());
private final EntityOwnershipListenerRegistration eosListenerRegistration;
- private final HashedWheelTimer timer;
private final ClusterSingletonServiceProvider singletonServiceProvider;
private final ItemScheduler<DeviceInfo, ContextChain> scheduler;
private final ExecutorService executorService;
+ private final OwnershipChangeListener ownershipChangeListener;
private DeviceManager deviceManager;
private RpcManager rpcManager;
private StatisticsManager statisticsManager;
public ContextChainHolderImpl(final HashedWheelTimer timer,
final ExecutorService executorService,
final ClusterSingletonServiceProvider singletonServiceProvider,
- final EntityOwnershipService entityOwnershipService) {
- this.timer = timer;
+ final EntityOwnershipService entityOwnershipService,
+ final OwnershipChangeListener ownershipChangeListener) {
this.singletonServiceProvider = singletonServiceProvider;
this.executorService = executorService;
+ this.ownershipChangeListener = ownershipChangeListener;
+ this.ownershipChangeListener.setMasterChecker(this);
this.eosListenerRegistration = Objects.requireNonNull(entityOwnershipService
.registerListener(ASYNC_SERVICE_ENTITY_TYPE, this));
}
@VisibleForTesting
- ContextChain createContextChain(final ConnectionContext connectionContext) {
+ void createContextChain(final ConnectionContext connectionContext) {
final DeviceInfo deviceInfo = connectionContext.getDeviceInfo();
final DeviceContext deviceContext = deviceManager.createContext(connectionContext);
- deviceContext.registerMastershipChangeListener(this);
+ deviceContext.registerMastershipWatcher(this);
LOG.debug("Device" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
final RpcContext rpcContext = rpcManager.createContext(deviceContext);
- rpcContext.registerMastershipChangeListener(this);
+ rpcContext.registerMastershipWatcher(this);
LOG.debug("RPC" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
final StatisticsContext statisticsContext = statisticsManager.createContext(deviceContext);
- statisticsContext.registerMastershipChangeListener(this);
+ statisticsContext.registerMastershipWatcher(this);
LOG.debug("Statistics" + CONTEXT_CREATED_FOR_CONNECTION, deviceInfo);
final ContextChain contextChain = new ContextChainImpl(this, connectionContext,
CHECK_ROLE_MASTER_TIMEOUT / 1000L);
contextChain.registerServices(singletonServiceProvider);
- return contextChain;
}
@Override
}
LOG.debug("No context chain found for device: {}, creating new.", deviceInfo);
- final ContextChain newContextChain = createContextChain(connectionContext);
- LOG.debug("Successfully created context chain with identifier: {}", newContextChain.getIdentifier());
+ createContextChain(connectionContext);
return ConnectionStatus.MAY_CONTINUE;
}
@Override
- public void onNotAbleToStartMastership(final DeviceInfo deviceInfo, @Nonnull final String reason, final boolean mandatory) {
+ public void onNotAbleToStartMastership(@Nonnull final DeviceInfo deviceInfo,
+ @Nonnull final String reason,
+ final boolean mandatory) {
LOG.warn("Not able to set MASTER role on device {}, reason: {}", deviceInfo, reason);
if (!mandatory) {
}
@Override
- public void onMasterRoleAcquired(final DeviceInfo deviceInfo, @Nonnull final ContextChainMastershipState mastershipState) {
+ public void onMasterRoleAcquired(@Nonnull final DeviceInfo deviceInfo,
+ @Nonnull final ContextChainMastershipState mastershipState) {
scheduler.remove(deviceInfo);
-
Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(contextChain -> {
- if (contextChain.isMastered(mastershipState)) {
+ if (ownershipChangeListener.isReconciliationFrameworkRegistered()) {
+ if (mastershipState == ContextChainMastershipState.INITIAL_SUBMIT) {
+ LOG.error("Initial submit is not allowed here if using reconciliation framework.");
+ } else {
+ contextChain.isMastered(mastershipState);
+ if (contextChain.isPrepared()) {
+ Futures.addCallback(
+ ownershipChangeListener.becomeMasterBeforeSubmittedDS(deviceInfo),
+ reconciliationFrameworkCallback(deviceInfo, contextChain),
+ MoreExecutors.directExecutor());
+ }
+ }
+ } else if (contextChain.isMastered(mastershipState)) {
LOG.info("Role MASTER was granted to device {}", deviceInfo);
+ ownershipChangeListener.becomeMaster(deviceInfo);
deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
}
});
@Override
public void onSlaveRoleAcquired(final DeviceInfo deviceInfo) {
scheduler.remove(deviceInfo);
+ ownershipChangeListener.becomeSlaveOrDisconnect(deviceInfo);
Optional.ofNullable(contextChainMap.get(deviceInfo)).ifPresent(ContextChain::makeContextChainStateSlave);
}
@Override
public void close() throws Exception {
scheduler.close();
- contextChainMap.keySet().forEach(this::destroyContextChain);
+ Map<DeviceInfo, ContextChain> copyOfChains = new HashMap<>(contextChainMap);
+ copyOfChains.keySet().forEach(this::destroyContextChain);
+ copyOfChains.clear();
contextChainMap.clear();
eosListenerRegistration.close();
}
deviceManager.sendNodeRemovedNotification(nodeInstanceIdentifier);
- LOG.info("Removing device {} from operational DS", nodeId);
+ LOG.info("Try to remove device {} from operational DS", nodeId);
deviceManager
.removeDeviceFromOperationalDS(nodeInstanceIdentifier)
- .checkedGet(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS);
- } catch (TimeoutException | TransactionCommitFailedException e) {
- LOG.info("Not able to remove device {} from operational DS. Probably removed by another cluster node.",
- nodeId);
+ .get(REMOVE_DEVICE_FROM_DS_TIMEOUT, TimeUnit.MILLISECONDS);
+ LOG.info("Removing device from operational DS {} was successful", nodeId);
+ } catch (TimeoutException | ExecutionException | NullPointerException | InterruptedException e) {
+ LOG.warn("Not able to remove device {} from operational DS. ",nodeId, e);
}
}
}
});
}
+ @Override
+ public List<DeviceInfo> listOfMasteredDevices() {
+ return contextChainMap
+ .entrySet()
+ .stream()
+ .filter(deviceInfoContextChainEntry -> deviceInfoContextChainEntry
+ .getValue()
+ .isMastered(ContextChainMastershipState.CHECK))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean isAnyDeviceMastered() {
+ return contextChainMap
+ .entrySet()
+ .stream()
+ .findAny()
+ .filter(deviceInfoContextChainEntry -> deviceInfoContextChainEntry.getValue()
+ .isMastered(ContextChainMastershipState.CHECK))
+ .isPresent();
+ }
+
private String getEntityNameFromOwnershipChange(final EntityOwnershipChange entityOwnershipChange) {
final YangInstanceIdentifier.NodeIdentifierWithPredicates lastIdArgument =
(YangInstanceIdentifier.NodeIdentifierWithPredicates) entityOwnershipChange
contextChainMap.remove(deviceInfo);
LOG.debug("Context chain removed for node {}", deviceInfo);
}
+
+ private FutureCallback<ResultState> reconciliationFrameworkCallback(
+ @Nonnull DeviceInfo deviceInfo,
+ ContextChain contextChain) {
+ return new FutureCallback<ResultState>() {
+ @Override
+ public void onSuccess(@Nullable ResultState result) {
+ if (ResultState.DONOTHING == result) {
+ LOG.info("Device {} connection is enabled by reconciliation framework.", deviceInfo);
+ if (!contextChain.continueInitializationAfterReconciliation()) {
+ LOG.warn("Initialization submit after reconciliation failed for device {}", deviceInfo);
+ destroyContextChain(deviceInfo);
+ } else {
+ ownershipChangeListener.becomeMaster(deviceInfo);
+ deviceManager.sendNodeAddedNotification(deviceInfo.getNodeInstanceIdentifier());
+ }
+ } else {
+ LOG.warn("Reconciliation framework failure for device {}", deviceInfo);
+ destroyContextChain(deviceInfo);
+ }
+ }
+
+ @Override
+ public void onFailure(@Nonnull Throwable t) {
+ LOG.warn("Reconciliation framework failure.");
+ destroyContextChain(deviceInfo);
+ }
+ };
+ }
}