/**
* message from MD-SAL to switch - sent to OFJava but failed with exception
*/
- TO_SWITCH_SUBMIT_ERROR
- }
+ TO_SWITCH_SUBMIT_ERROR,
+ /**
+ * message from MD-SAL to switch - asked for XID reservation in queue, but rejected
+ */
+ RESERVATION_REJECTED,
+ /**
+ * message from switch to MD-SAL - notification service rejected notfication
+ */
+ NOTIFICATION_REJECTED
+
+ }
/**
* @param message from switch or to switch - depends on statGroup
private final MessageIntelligenceAgency messageIntelligenceAgency;
private final long barrierNanos = 500000000L;
- private final int maxQueueDepth = 1024;
+ private final int maxQueueDepth = 25600;
public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
@Nonnull final MessageIntelligenceAgency messageIntelligenceAgency) {
final RequestContext<T> requestContext = requestContextStack.createRequestContext();
final SettableFuture<RpcResult<T>> result = requestContextStack.storeOrFail(requestContext);
if (result.isDone()) {
- messageSpy.spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_DISREGARDED);
LOG.trace("Request context refused.");
return result;
}
- final Long reservedXid = deviceContext.getReservedXid();
+ Long reservedXid = deviceContext.getReservedXid();
if (null == reservedXid) {
- RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
- return result;
+ //retry
+ reservedXid = deviceContext.getReservedXid();
+ if (null == reservedXid) {
+ RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
+ deviceContext.getMessageSpy().spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.RESERVATION_REJECTED);
+ return result;
+ }
}
final Xid xid = new Xid(reservedXid);
requestContext.setXid(xid);
package org.opendaylight.openflowplugin.impl.services;
import com.google.common.base.Function;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
-import java.math.BigInteger;
+import com.google.common.util.concurrent.SettableFuture;
import java.util.concurrent.Future;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.PacketOutConvertor;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.ConnectionCookie;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInput;
import org.opendaylight.yangtools.yang.common.RpcResult;
final PacketOutInput message = PacketOutConvertor.toPacketOutInput(input, getVersion(), xid.getValue(),
getDatapathId());
- BigInteger connectionID = getPrimaryConnection();
- final ConnectionCookie connectionCookie = input.getConnectionCookie();
- if (connectionCookie != null && connectionCookie.getValue() != null) {
- connectionID = BigInteger.valueOf(connectionCookie.getValue());
- }
+ final OutboundQueue outboundQueue = getDeviceContext().getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
- ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(provideConnectionAdapter(connectionID).packetOut(message));
- return Futures.transform(rpcResultListenableFuture, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+ final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
+
+ outboundQueue.commitEntry(xid.getValue(), message, new FutureCallback<OfHeader>() {
@Override
- public ListenableFuture<RpcResult<Void>> apply(RpcResult<Void> rpcResult) throws Exception {
- if (! rpcResult.isSuccessful()) {
- return Futures.immediateFuture(rpcResult);
- } else {
- return Futures.immediateCancelledFuture();
+ public void onSuccess(final OfHeader ofHeader) {
+ if (ofHeader instanceof RpcResult) {
+ RpcResult rpcResult = (RpcResult) ofHeader;
+ if (!rpcResult.isSuccessful()) {
+ settableFuture.set(rpcResult);
+ } else {
+ settableFuture.cancel(true);
+ }
}
}
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ settableFuture.cancel(true);
+ }
});
+ return settableFuture;
}
});
.storeOrFail(requestContext);
if (!sendEchoOutput.isDone()) {
final DeviceContext deviceContext = getDeviceContext();
- final Long reserverXid = deviceContext.getReservedXid();
- if (null == reserverXid){
- RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
- return sendEchoOutput;
+ Long reserverXid = deviceContext.getReservedXid();
+ if (null == reserverXid) {
+ if (null == reserverXid) {
+ reserverXid = deviceContext.getReservedXid();
+ RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
+ return sendEchoOutput;
+ }
}
final Xid xid = new Xid(reserverXid);
requestContext.setXid(xid);
FlowId flowId = null;
@Override
public void onSuccess(final RpcResult<AddFlowOutput> rpcResult) {
- if (null != input.getFlowRef()) {
- flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
- final FlowDescriptor flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
- deviceContext.getDeviceFlowRegistry().store(flowHash, flowDescriptor);
- } else {
- flowId = getDeviceContext().getDeviceFlowRegistry().storeIfNecessary(flowHash, input.getTableId());
- }
if (rpcResult.isSuccessful()) {
+ getMessageSpy().spyMessage(FlowModInput.class, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
LOG.debug("flow add finished without error, id={}", flowId.getValue());
} else {
LOG.debug("flow add failed with error, id={}", flowId.getValue());
@Override
public void onFailure(final Throwable throwable) {
deviceContext.getDeviceFlowRegistry().markToBeremoved(flowHash);
- LOG.trace("Service call for adding flows failed, hash id={}.", flowHash, throwable);
+ getMessageSpy().spyMessage(FlowModInput.class, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
+ LOG.trace("Service call for adding flows failed, id={}.", flowId.getValue(), throwable);
}
});
@Override
public void onSuccess(final OfHeader ofHeader) {
settableFuture.set(RpcResultBuilder.<Void>success().build());
- getMessageSpy().spyMessage(flowModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
}
@Override
public void onFailure(final Throwable throwable) {
RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, throwable.getMessage());
settableFuture.set(rpcResultBuilder.build());
- getMessageSpy().spyMessage(flowModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
}
});
return settableFuture;
type);
final OutboundQueue outboundQueue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
- synchronized (outboundQueue) {
- outboundQueue.commitEntry(xid.getValue(), multipartRequestInput, new FutureCallback<OfHeader>() {
- @Override
- public void onSuccess(final OfHeader ofHeader) {
- if (ofHeader instanceof MultipartReply) {
- final MultipartReply multipartReply = (MultipartReply) ofHeader;
- settableFuture.set(RpcResultBuilder.<Void>success().build());
- multiMsgCollector.addMultipartMsg(multipartReply);
+ outboundQueue.commitEntry(xid.getValue(), multipartRequestInput, new FutureCallback<OfHeader>() {
+ @Override
+ public void onSuccess(final OfHeader ofHeader) {
+ if (ofHeader instanceof MultipartReply) {
+ final MultipartReply multipartReply = (MultipartReply) ofHeader;
+ settableFuture.set(RpcResultBuilder.<Void>success().build());
+ multiMsgCollector.addMultipartMsg(multipartReply);
+ } else {
+ if (null != ofHeader) {
+ LOG.info("Unexpected response type received {}.", ofHeader.getClass());
} else {
- if (null != ofHeader) {
- LOG.info("Unexpected response type received {}.", ofHeader.getClass());
- } else {
- LOG.info("Response received is null.");
- }
+ LOG.info("Unexpected response type received {}.", ofHeader.getClass());
}
-
}
- @Override
- public void onFailure(final Throwable throwable) {
- RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage());
- settableFuture.set(rpcResultBuilder.build());
- }
- });
- }
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage());
+ settableFuture.set(rpcResultBuilder.build());
+ }
+ });
return settableFuture;
}
}