*/
public interface OFPContext {
- default void setState(CONTEXT_STATE contextState) {
- //NOOP
- }
+ void setState(CONTEXT_STATE contextState);
/**
* Context state
*/
void shutdownConnection();
+ /**
+ * Initial submit transaction
+ */
void initialSubmitTransaction();
/**
*/
public interface LifecycleService extends ClusterSingletonService, AutoCloseable {
- void registerService(ClusterSingletonServiceProvider singletonServiceProvider);
+ /**
+ * This method registers lifecycle service to the given provider
+ * @param singletonServiceProvider from md-sal binding
+ */
+ void registerService(final ClusterSingletonServiceProvider singletonServiceProvider);
- void setDeviceContext(DeviceContext deviceContext);
+ /**
+ * Setter for device context
+ * @param deviceContext actual device context created per device
+ */
+ void setDeviceContext(final DeviceContext deviceContext);
- void setRpcContext(RpcContext rpcContext);
+ /**
+ * Setter for rpc context
+ * @param rpcContext actual rpc context created per device
+ */
+ void setRpcContext(final RpcContext rpcContext);
- void setRoleContext(RoleContext roleContext);
+ /**
+ * Setter for role context
+ * @param roleContext actual role context created per device
+ */
+ void setRoleContext(final RoleContext roleContext);
- void setStatContext(StatisticsContext statContext);
+ /**
+ * Setter for statistics context
+ * @param statContext actual statistics context created per device
+ */
+ void setStatContext(final StatisticsContext statContext);
/**
* Some services, contexts etc. still need to have access to device context,
*/
package org.opendaylight.openflowplugin.api.openflow.role;
+import com.google.common.util.concurrent.ListenableFuture;
import javax.annotation.Nonnull;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SalRoleService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
/**
* Role context for change role on cluster
*/
SalRoleService getSalRoleService();
+ ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave();
}
@Override
public void setOutboundQueueProvider(final OutboundQueueProvider outboundQueueProvider) {
this.outboundQueueProvider = outboundQueueProvider;
+ ((DeviceInfoImpl)this.deviceInfo).setOutboundQueueProvider(this.outboundQueueProvider);
}
@Override
this.handshakeContext = handshakeContext;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ ConnectionContextImpl that = (ConnectionContextImpl) o;
+
+ if (!connectionAdapter.equals(that.connectionAdapter)) {
+ return false;
+ }
+
+ if (featuresReply != null ? !featuresReply.equals(that.featuresReply) : that.featuresReply != null) {
+ return false;
+ }
+
+ return nodeId != null ? nodeId.equals(that.nodeId) : that.nodeId == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = connectionAdapter.hashCode();
+ result = 31 * result + (featuresReply != null ? featuresReply.hashCode() : 0);
+ result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0);
+ return result;
+ }
private class DeviceInfoImpl implements DeviceInfo {
return result;
}
+ public void setOutboundQueueProvider(final OutboundQueue outboundQueueProvider) {
+ this.outboundQueueProvider = outboundQueueProvider;
+ }
+
@Override
public Long reserveXidForDeviceMessage() {
return outboundQueueProvider.reserveEntry();
LOG.debug("Initializing transaction chain manager for node {}", getDeviceInfo().getNodeId());
this.transactionChainManager.activateTransactionManager();
LOG.debug("Waiting to get node {} information", getDeviceInfo().getNodeId());
- DeviceInitializationUtils.initializeNodeInformation(this, switchFeaturesMandatory, this.convertorExecutor).get();
+ DeviceInitializationUtils.initializeNodeInformation(this, switchFeaturesMandatory, this.convertorExecutor);
}
@Override
import io.netty.util.TimerTask;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
* If context already exist we are in state closing process (connection flapping) and we should not propagate connection close
*/
if (deviceContexts.containsKey(deviceInfo)) {
- LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId());
+ DeviceContext deviceContext = deviceContexts.get(deviceInfo);
+ if (!deviceContext.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
+ LOG.warn("Context state for node {} is not in TERMINATION state, trying to reconnect", connectionContext.getNodeId().getValue());
+ } else {
+ LOG.warn("Rejecting connection from node which is already connected and there exist deviceContext for it: {}", connectionContext.getNodeId().getValue());
+ }
return false;
}
this,
convertorExecutor);
- Verify.verify(deviceContexts.putIfAbsent(deviceInfo, deviceContext) == null, "DeviceCtx still not closed.");
+ deviceContexts.putIfAbsent(deviceInfo, deviceContext);
final LifecycleService lifecycleService = new LifecycleServiceImpl();
lifecycleService.setDeviceContext(deviceContext);
@Override
public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
- LOG.debug("onDeviceContextClosed for Node {}", deviceInfo.getNodeId());
deviceContexts.remove(deviceInfo);
- updatePacketInRateLimiters();
LifecycleService lifecycleService = lifecycleServices.remove(deviceInfo);
- try {
- lifecycleService.close();
- } catch (Exception e) {
- LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e);
+ updatePacketInRateLimiters();
+ if (Objects.nonNull(lifecycleService)) {
+ try {
+ lifecycleService.close();
+ } catch (Exception e) {
+ LOG.warn("Closing service for node {} was unsuccessful ", deviceInfo.getNodeId().getValue(), e);
+ }
}
}
return;
}
+ if (deviceCtx.getState().equals(OFPContext.CONTEXT_STATE.TERMINATION)) {
+ LOG.debug("Device context for node {} is already is termination state, waiting for close all context");
+ return;
+ }
+
+ deviceCtx.setState(OFPContext.CONTEXT_STATE.TERMINATION);
+
if (!connectionContext.equals(deviceCtx.getPrimaryConnectionContext())) {
+ LOG.debug("Node {} disconnected, but not primary connection.", connectionContext.getDeviceInfo().getNodeId().getValue());
/* Connection is not PrimaryConnection so try to remove from Auxiliary Connections */
deviceCtx.removeAuxiliaryConnectionContext(connectionContext);
- } else {
+ }
+ //TODO: Auxiliary connections supported ?
+ {
/* Device is disconnected and so we need to close TxManager */
final ListenableFuture<Void> future = deviceCtx.shuttingDownDataStoreTransactions();
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId());
+ LOG.debug("TxChainManager for device {} is closed successful.", deviceInfo.getNodeId().getValue());
deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
}
@Override
public void onFailure(final Throwable t) {
- LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId(), t);
+ LOG.warn("TxChainManager for device {} failed by closing.", deviceInfo.getNodeId().getValue());
+ LOG.trace("TxChainManager failed by closing. ", t);
deviceTerminPhaseHandler.onDeviceContextLevelDown(deviceInfo);
}
});
/* Add timer for Close TxManager because it could fain ind cluster without notification */
final TimerTask timerTask = timeout -> {
if (!future.isDone()) {
- LOG.info("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId());
+ LOG.warn("Shutting down TxChain for node {} not completed during 10 sec. Continue anyway.", deviceInfo.getNodeId().getValue());
future.cancel(false);
}
};
deviceContexts.put(deviceInfo, deviceContext);
}
- @VisibleForTesting
- void removeDeviceContextFromMap(final DeviceInfo deviceInfo){
- deviceContexts.remove(deviceInfo);
- }
-
@Override
public <T extends OFPContext> T gainContext(final DeviceInfo deviceInfo) {
return (T) deviceContexts.get(deviceInfo);
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.concurrent.CancellationException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.md.sal.common.api.data.TransactionChainListener;
import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
@Override
public void onFailure(final Throwable t) {
if (t instanceof TransactionCommitFailedException) {
- LOG.error("Transaction commit failed. {}", t);
+ LOG.error("Transaction commit failed. ", t);
} else {
- LOG.error("Exception during transaction submitting. {}", t);
+ if (t instanceof CancellationException) {
+ LOG.warn("Submit task was canceled");
+ LOG.trace("Submit exception: ", t);
+ } else {
+ LOG.error("Exception during transaction submitting. ", t);
+ }
}
if (initCommit) {
- LOG.error("Initial commit failed. {}", t);
+ LOG.warn("Initial commit failed. ", t);
+ wTx = null;
}
}
});
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
+import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
public void instantiateServiceInstance() {
try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Starting clustering MASTER services for node {}", this.deviceContext.getDeviceInfo().getNodeId().getValue());
+ LOG.debug("===============================================");
+ }
+
+ if (connectionInterrupted()) {
+ return;
+ }
+
LOG.info("Starting device context cluster services for node {}", getIdentifier());
this.deviceContext.startupClusterServices();
+ if (connectionInterrupted()) {
+ return;
+ }
+
LOG.info("Starting statistics context cluster services for node {}", getIdentifier());
this.statContext.startupClusterServices();
+ if (connectionInterrupted()) {
+ return;
+ }
+
LOG.info("Statistics initial gathering OK, submitting data for node {}", getIdentifier());
this.deviceContext.initialSubmitTransaction();
+ if (connectionInterrupted()) {
+ return;
+ }
+
LOG.info("Starting rpc context cluster services for node {}", getIdentifier());
this.rpcContext.startupClusterServices();
+ if (connectionInterrupted()) {
+ return;
+ }
+
LOG.info("Starting role context cluster services for node {}", getIdentifier());
this.roleContext.startupClusterServices();
+ if (connectionInterrupted()) {
+ return;
+ }
+
LOG.info("Caching flows IDs ...");
fillDeviceFlowRegistry();
} catch (ExecutionException | InterruptedException e) {
LOG.warn("Cluster service {} was unable to start.", this.getIdentifier());
+ this.deviceContext.shutdownConnection();
+ }
+ }
+
+ private boolean connectionInterrupted() {
+ if (this.deviceContext.getPrimaryConnectionContext().getConnectionState().equals(ConnectionContext.CONNECTION_STATE.RIP)) {
+ LOG.warn("Node {} was disconnected, will stop starting MASTER services.", this.deviceContext.getDeviceInfo().getNodeId().getValue());
+ return true;
}
+ return false;
}
@Override
public ListenableFuture<Void> closeServiceInstance() {
+ roleContext.stopClusterServices();
statContext.stopClusterServices();
rpcContext.stopClusterServices();
return deviceContext.stopClusterServices();
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@Override
public void onSuccess(Optional<FlowCapableNode> result) {
result.asSet().stream()
+ .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
.flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+ .filter(table -> Objects.nonNull(table.getFlow()))
.flatMap(table -> table.getFlow().stream())
+ .filter(Objects::nonNull)
+ .filter(flow -> Objects.nonNull(flow.getId()))
.forEach(flowConsumer);
// After we are done with reading from datastore, close the transaction
*/
package org.opendaylight.openflowplugin.impl.role;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
private final DeviceInfo deviceInfo;
private CONTEXT_STATE state;
private final RoleManager myManager;
+ private final LifecycleService lifecycleService;
RoleContextImpl(final DeviceInfo deviceInfo,
final HashedWheelTimer hashedWheelTimer,
- final RoleManager myManager) {
+ final RoleManager myManager,
+ final LifecycleService lifecycleService) {
this.deviceInfo = deviceInfo;
+ this.lifecycleService = lifecycleService;
state = CONTEXT_STATE.WORKING;
this.myManager = myManager;
this.hashedWheelTimer = hashedWheelTimer;
}
public void startupClusterServices() throws ExecutionException, InterruptedException {
- //TODO: Add callback ?
- sendRoleChangeToDevice(OfpRole.BECOMEMASTER).get();
+ Futures.addCallback(sendRoleChangeToDevice(OfpRole.BECOMEMASTER), new FutureCallback<RpcResult<SetRoleOutput>>() {
+ @Override
+ public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Role MASTER was successfully set on device, node {}", deviceInfo.getNodeId().getValue());
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.warn("Was not able to set MASTER role on device, node {}", deviceInfo.getNodeId().getValue());
+ lifecycleService.closeConnection();
+ }
+ });
}
@Override
public ListenableFuture<Void> stopClusterServices() {
- ListenableFuture<Void> future;
- try {
- //TODO: Add callback
- sendRoleChangeToDevice(OfpRole.BECOMESLAVE).get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.warn("Send role to device failed ", e);
- } finally {
- myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
- future = Futures.immediateFuture(null);
- }
+ ListenableFuture<Void> future = Futures.transform(makeDeviceSlave(), new Function<RpcResult<SetRoleOutput>, Void>() {
+ @Nullable
+ @Override
+ public Void apply(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+ return null;
+ }
+ });
+
+ Futures.addCallback(future, new FutureCallback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void aVoid) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getNodeId().getValue());
+ }
+ myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.warn("Was not able to set role SLAVE to device on node {} ", deviceInfo.getNodeId().getValue());
+ LOG.trace("Error occurred on device role setting, probably connection loss: ", throwable);
+ myManager.removeDeviceFromOperationalDS(deviceInfo, MAX_CLEAN_DS_RETRIES);
+
+ }
+ });
return future;
}
+ @Override
+ public ListenableFuture<RpcResult<SetRoleOutput>> makeDeviceSlave(){
+ return sendRoleChangeToDevice(OfpRole.BECOMESLAVE);
+ }
+
private ListenableFuture<RpcResult<SetRoleOutput>> sendRoleChangeToDevice(final OfpRole newRole) {
LOG.debug("Sending new role {} to device {}", newRole, deviceInfo.getNodeId());
final Future<RpcResult<SetRoleOutput>> setRoleOutputFuture;
import java.util.concurrent.ConcurrentMap;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.openflowplugin.impl.services.SalRoleServiceImpl;
import org.opendaylight.openflowplugin.impl.util.DeviceStateUtil;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.role.service.rev150727.SetRoleOutput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Override
public void onDeviceContextLevelUp(@CheckForNull final DeviceInfo deviceInfo, final LifecycleService lifecycleService) throws Exception {
final DeviceContext deviceContext = Preconditions.checkNotNull(lifecycleService.getDeviceContext());
- final RoleContext roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, this);
+ final RoleContext roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, this, lifecycleService);
roleContext.setSalRoleService(new SalRoleServiceImpl(roleContext, deviceContext));
Verify.verify(contexts.putIfAbsent(deviceInfo, roleContext) == null, "Role context for master Node %s is still not closed.", deviceInfo.getNodeId());
+ Futures.addCallback(roleContext.makeDeviceSlave(), new FutureCallback<RpcResult<SetRoleOutput>>() {
+ @Override
+ public void onSuccess(@Nullable RpcResult<SetRoleOutput> setRoleOutputRpcResult) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Role SLAVE was successfully propagated on device, node {}", deviceInfo.getNodeId().getValue());
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ LOG.warn("Was not able to set role SLAVE to device on node {} ",deviceInfo.getNodeId().getValue());
+ lifecycleService.closeConnection();
+ }
+ });
lifecycleService.setRoleContext(roleContext);
deviceInitializationPhaseHandler.onDeviceContextLevelUp(deviceInfo, lifecycleService);
}
@Override
public void onDeviceContextLevelDown(final DeviceInfo deviceInfo) {
- LOG.trace("onDeviceContextLevelDown for node {}", deviceInfo.getNodeId());
- final RoleContext roleContext = contexts.remove(deviceInfo);
- if (roleContext != null) {
- LOG.debug("Found roleContext associated to deviceContext: {}, now trying close the roleContext", deviceInfo.getNodeId());
- contexts.remove(deviceInfo.getNodeId(), roleContext);
- }
+ contexts.remove(deviceInfo);
deviceTerminationPhaseHandler.onDeviceContextLevelDown(deviceInfo);
}
final RoutedRpcRegistration<?> rpcRegistration = iterator.next().getValue();
rpcRegistration.unregisterPath(NodeContext.class, nodeInstanceIdentifier);
rpcRegistration.close();
- LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType(),
- nodeInstanceIdentifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing RPC Registration of service {} for device {}.", rpcRegistration.getServiceType(),
+ nodeInstanceIdentifier);
+ }
}
}
}
}
public Future<BigInteger> getGenerationIdFromDevice(final Short version) {
- LOG.info("getGenerationIdFromDevice called for device:{}", getDeviceInfo().getNodeId().getValue());
+ LOG.info("getGenerationIdFromDevice called for device: {}", getDeviceInfo().getNodeId().getValue());
// send a dummy no-change role request to get the generation-id of the switch
final RoleRequestInputBuilder roleRequestInputBuilder = new RoleRequestInputBuilder();
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
import org.opendaylight.mdsal.singleton.common.api.ServiceGroupIdentifier;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
}
@Override
public void onFailure(final Throwable t) {
- StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
+ if (!(t instanceof TransactionChainClosedException)) {
+ StatisticsGatheringUtils.markDeviceStateSnapshotEnd(deviceContext, false);
+ }
}
});
return settableStatResultFuture;
import org.opendaylight.controller.md.sal.binding.api.ReadOnlyTransaction;
import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
.build();
try {
deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusPath, gatheringStatus);
- } catch (final Exception e) {
- LOG.warn("Can't write to transaction: {}", e);
+ } catch (final TransactionChainClosedException e) {
+ LOG.warn("Can't write to transaction, transaction chain probably closed.");
+ LOG.trace("Write to transaction exception: ", e);
}
deviceContext.submitTransaction();
.build();
try {
deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, statusEndPath, gatheringStatus);
- } catch (Exception e) {
- LOG.warn("Can't write to transaction: {}", e);
+ } catch (TransactionChainClosedException e) {
+ LOG.warn("Can't write to transaction, transaction chain probably closed.");
+ LOG.trace("Write to transaction exception: ", e);
}
deviceContext.submitTransaction();
* @param deviceContext
* @param switchFeaturesMandatory
* @param convertorExecutor
- * @return future - recommended to have blocking call for this future
*/
- public static ListenableFuture<Void> initializeNodeInformation(final DeviceContext deviceContext, final boolean switchFeaturesMandatory, final ConvertorExecutor convertorExecutor) {
+ public static void initializeNodeInformation(final DeviceContext deviceContext, final boolean switchFeaturesMandatory, final ConvertorExecutor convertorExecutor) throws ExecutionException, InterruptedException {
Preconditions.checkArgument(deviceContext != null);
final DeviceState deviceState = Preconditions.checkNotNull(deviceContext.getDeviceState());
final DeviceInfo deviceInfo = deviceContext.getDeviceInfo();
final Capabilities capabilities = connectionContext.getFeatures().getCapabilities();
LOG.debug("Setting capabilities for device {}", deviceInfo.getNodeId());
DeviceStateUtil.setDeviceStateBasedOnV13Capabilities(deviceState, capabilities);
- deviceFeaturesFuture = createDeviceFeaturesForOF13(deviceContext, switchFeaturesMandatory, convertorExecutor);
+ createDeviceFeaturesForOF13(deviceContext, switchFeaturesMandatory, convertorExecutor).get();
} else {
- deviceFeaturesFuture = Futures.immediateFailedFuture(new ConnectionException("Unsupported version "
- + version));
+ throw new ExecutionException(new ConnectionException("Unsupported version " + version));
}
- Futures.addCallback(deviceFeaturesFuture, new FutureCallback<List<RpcResult<List<MultipartReply>>>>() {
- @Override
- public void onSuccess(final List<RpcResult<List<MultipartReply>>> result) {
- LOG.debug("All init data for node {} is in submitted.", deviceInfo.getNodeId());
- returnFuture.set(null);
- }
-
- @Override
- public void onFailure(final Throwable t) {
- // FIXME : remove session
- LOG.trace("Device capabilities gathering future failed.");
- LOG.trace("more info in exploration failure..", t);
- LOG.debug("All init data for node {} was not submited correctly - connection has to go down.", deviceInfo.getNodeId());
- returnFuture.setException(t);
- }
- });
- return returnFuture;
}
private static void addNodeToOperDS(final DeviceContext deviceContext, final SettableFuture<Void> future) {
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.OFPContext;
import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
when(deviceContext.shuttingDownDataStoreTransactions()).thenReturn(Futures.immediateCheckedFuture(null));
when(deviceContext.getPrimaryConnectionContext()).thenReturn(connectionContext);
when(deviceContext.getDeviceState()).thenReturn(deviceState);
+ when(deviceContext.getState()).thenReturn(OFPContext.CONTEXT_STATE.WORKING);
final ConcurrentHashMap<DeviceInfo, DeviceContext> deviceContexts = getContextsCollection(deviceManager);
deviceContexts.put(deviceInfo, deviceContext);
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.junit.Assert;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.flow.and.statistics.map.list.FlowAndStatisticsMapList;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowCookie;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
public void setUp() throws Exception {
nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(new NodeId(NODE_ID)));
when(dataBroker.newReadOnlyTransaction()).thenReturn(readOnlyTransaction);
- when(readOnlyTransaction.read(any(), any())).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, nodeInstanceIdentifier);
final FlowAndStatisticsMapList flowStats = TestFlowHelper.createFlowAndStatisticsMapListBuilder(1).build();
key = FlowRegistryKeyFactory.create(flowStats);
public void testFill() throws Exception {
final InstanceIdentifier<FlowCapableNode> path = nodeInstanceIdentifier.augmentation(FlowCapableNode.class);
+ final Flow flow = new FlowBuilder()
+ .setTableId((short)1)
+ .setPriority(10)
+ .setCookie(new FlowCookie(BigInteger.TEN))
+ .setId(new FlowId("HELLO"))
+ .build();
+
+ final Table table = new TableBuilder()
+ .setFlow(Collections.singletonList(flow))
+ .build();
+
+ final FlowCapableNode flowCapableNode = new FlowCapableNodeBuilder()
+ .setTable(Collections.singletonList(table))
+ .build();
+
+ when(readOnlyTransaction.read(any(), any())).thenReturn(Futures.immediateCheckedFuture(Optional.of(flowCapableNode)));
+
deviceFlowRegistry.fill().get();
+ verify(dataBroker, times(2)).newReadOnlyTransaction();
+ verify(readOnlyTransaction).read(LogicalDatastoreType.CONFIGURATION, path);
+ verify(readOnlyTransaction).read(LogicalDatastoreType.OPERATIONAL, path);
+
+ final Map<FlowRegistryKey, FlowDescriptor> allFlowDescriptors = deviceFlowRegistry.getAllFlowDescriptors();
+ final FlowRegistryKey key = FlowRegistryKeyFactory.create(flow);
+
+ assertTrue(allFlowDescriptors.containsKey(key));
+
+ deviceFlowRegistry.markToBeremoved(key);
+ deviceFlowRegistry.removeMarked();
+ }
+
+ @Test
+ public void testFailedFill() throws Exception {
+ final InstanceIdentifier<FlowCapableNode> path = nodeInstanceIdentifier.augmentation(FlowCapableNode.class);
+
+ final Table table = new TableBuilder()
+ .setFlow(null)
+ .build();
+ final FlowCapableNode flowCapableNode = new FlowCapableNodeBuilder()
+ .setTable(Collections.singletonList(table))
+ .build();
+
+ when(readOnlyTransaction.read(any(), any())).thenReturn(Futures.immediateCheckedFuture(Optional.of(flowCapableNode)));
+
+ deviceFlowRegistry.fill().get();
verify(dataBroker, times(2)).newReadOnlyTransaction();
verify(readOnlyTransaction).read(LogicalDatastoreType.CONFIGURATION, path);
verify(readOnlyTransaction).read(LogicalDatastoreType.OPERATIONAL, path);
+
+ final Map<FlowRegistryKey, FlowDescriptor> allFlowDescriptors = deviceFlowRegistry.getAllFlowDescriptors();
+
+ Assert.assertEquals(1, allFlowDescriptors.size());
}
@Test
import org.mockito.runners.MockitoJUnitRunner;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleService;
import org.opendaylight.openflowplugin.api.openflow.role.RoleContext;
import org.opendaylight.openflowplugin.api.openflow.role.RoleManager;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
private DeviceInfo deviceInfo;
@Mock
private RoleManager roleManager;
+ @Mock
+ private LifecycleService lifecycleService;
private final NodeId nodeId = NodeId.getDefaultInstance("openflow:1");
private RoleContext roleContext;
@Before
public void setup() throws CandidateAlreadyRegisteredException {
- roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, roleManager);
+ roleContext = new RoleContextImpl(deviceInfo, hashedWheelTimer, roleManager, lifecycleService);
Mockito.when(deviceInfo.getNodeId()).thenReturn(nodeId);
}