import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
-import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowHash;
partialFutures.add(partialFuture);
}
- // processing of final (optionally composite future)
final ListenableFuture<List<RpcResult<T>>> allFutures = Futures.successfulAsList(partialFutures);
final SettableFuture<RpcResult<T>> finalFuture = SettableFuture.create();
Futures.addCallback(allFutures, new FutureCallback<List<RpcResult<T>>>() {
@Override
public void onSuccess(List<RpcResult<T>> results) {
- List<RpcError> rpcErrorLot = new ArrayList<>();
- RpcResultBuilder<T> resultBuilder;
-
- Iterator<FlowModInputBuilder> flowModInputBldIterator = ofFlowModInputs.iterator();
- Iterator<RpcResult<T>> resultIterator = results.iterator();
-
- for (ListenableFuture<RpcResult<T>> partFutureFromRqCtx : partialFutures) {
- FlowModInputBuilder flowModInputBld = flowModInputBldIterator.next();
- RpcResult<T> result = resultIterator.next();
- Long xid = flowModInputBld.getXid();
-
-
- LOG.trace("flowMod future processing [{}], result={}", xid, result);
- if (partFutureFromRqCtx.isCancelled()) { // one and only positive case
- if (LOG.isTraceEnabled()) {
- LOG.trace("flow future result was cancelled [{}] = barrier passed it without error", xid);
- }
- } else { // all negative cases
- if (result == null) { // there is exception or null value set
- try {
- partFutureFromRqCtx.get();
- } catch (Exception e) {
- rpcErrorLot.add(RpcResultBuilder.newError(ErrorType.APPLICATION, "",
- "flow future result [" + xid + "] failed with exception",
- OFConstants.APPLICATION_TAG, e.getMessage(), e));
-
- // xid might be not available in case requestContext not even stored
- if (xid != null) {
- final DeviceContext deviceContext = getDeviceContext();
- deviceContext.unhookRequestCtx(new Xid(xid));
- }
- }
- } else {
- if (result.isSuccessful()) { // positive confirmation - never happens
- LOG.warn("Positive confirmation of flow push is not supported by OF-spec");
- LOG.warn("flow future result was successful [{}] = this should have never happen",
- xid);
- rpcErrorLot.add(RpcResultBuilder.newError(ErrorType.APPLICATION, "",
- "flow future result was successful [" + xid + "] = this should have never happen"));
- } else { // standard error occurred
- LOG.trace("passing original rpcErrors [{}]", xid);
- if (LOG.isTraceEnabled()) {
- for (RpcError rpcError : result.getErrors()) {
- LOG.trace("passed rpcError [{}]: {}", xid, rpcError);
- }
- }
- rpcErrorLot.addAll(result.getErrors());
- }
- }
- }
- }
-
- if (rpcErrorLot.isEmpty()) {
- resultBuilder = RpcResultBuilder.<T>success();
- } else {
- resultBuilder = RpcResultBuilder.<T>failed().withRpcErrors(rpcErrorLot);
- }
-
- finalFuture.set(resultBuilder.build());
+ RpcResultBuilder rpcResultBuilder = RpcResultBuilder.success();
+ finalFuture.set(rpcResultBuilder.build());
}
@Override
public void onFailure(Throwable t) {
- LOG.trace("Flow mods chained future failed.");
- RpcResultBuilder<T> resultBuilder = RpcResultBuilder.<T>failed()
- .withError(ErrorType.APPLICATION, "", t.getMessage());
- finalFuture.set(resultBuilder.build());
+ RpcResultBuilder rpcResultBuilder = RpcResultBuilder.failed();
+ finalFuture.set(rpcResultBuilder.build());
}
});
protected <T> ListenableFuture<RpcResult<Void>> createResultForFlowMod(final DataCrate<T> data, final FlowModInputBuilder flowModInputBuilder) {
final OutboundQueue outboundQueue = getDeviceContext().getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
- long xid = data.getRequestContext().getXid().getValue();
+ final long xid = data.getRequestContext().getXid().getValue();
flowModInputBuilder.setXid(xid);
final FlowModInput flowModInput = flowModInputBuilder.build();
outboundQueue.commitEntry(xid, flowModInput, new FutureCallback<OfHeader>() {
@Override
public void onSuccess(final OfHeader ofHeader) {
+ RequestContextUtil.closeRequstContext(data.getRequestContext());
+ getDeviceContext().unhookRequestCtx(data.getRequestContext().getXid());
+ getMessageSpy().spyMessage(FlowModInput.class, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
+
settableFuture.set(RpcResultBuilder.<Void>success().build());
}
@Override
public void onFailure(final Throwable throwable) {
- RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, throwable.getMessage());
+ RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, throwable.getMessage(), throwable);
+ RequestContextUtil.closeRequstContext(data.getRequestContext());
+ getDeviceContext().unhookRequestCtx(data.getRequestContext().getXid());
settableFuture.set(rpcResultBuilder.build());
}
});