added new statistics groups for measuring refused notifications and XID requests 98/20398/5
authorMartin Bobak <mbobak@cisco.com>
Thu, 14 May 2015 14:15:06 +0000 (16:15 +0200)
committerMartin Bobak <mbobak@cisco.com>
Fri, 15 May 2015 15:54:25 +0000 (17:54 +0200)
- spying for new statistics
- initial OFJ queue length set to 25600
- when first XID reservation on outbound queue fails, we attempt for second one
- PacketProcessingService uses outbound queue for sending requests to OFJ
- StatisticsGatheringService doesn't use uoutbound queue in synchronized block

Change-Id: I9f40be8c65c2ff4d75d81b9b814fa28cc1cae8f2
Signed-off-by: Martin Bobak <mbobak@cisco.com>
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/statistics/ofpspecific/MessageSpy.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/CommonService.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/PacketProcessingServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalEchoServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/services/dedicated/StatisticsGatheringService.java

index 60fa016311cc7a240a64709ad1aff9758509d969..2aa79abaffc721caccc61c641e33f8664cfd0a5d 100644 (file)
@@ -69,8 +69,17 @@ public interface MessageSpy<M> extends Runnable {
         /**
          * message from MD-SAL to switch - sent to OFJava but failed with exception
          */
-        TO_SWITCH_SUBMIT_ERROR
-    }
+        TO_SWITCH_SUBMIT_ERROR,
+        /**
+         * message from MD-SAL to switch - asked for XID reservation in queue, but rejected
+         */
+        RESERVATION_REJECTED,
+        /**
+         * message from switch to MD-SAL  - notification service rejected notfication
+         */
+        NOTIFICATION_REJECTED
+
+        }
 
     /**
      * @param message   from switch or to switch - depends on statGroup
index 060dc8e26e88c1524e582f36eba064d41ce3269a..e342a1f108fa2e5168174fa7667276516be8d9d1 100644 (file)
@@ -122,7 +122,7 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
     private final MessageIntelligenceAgency messageIntelligenceAgency;
 
     private final long barrierNanos = 500000000L;
-    private final int maxQueueDepth = 1024;
+    private final int maxQueueDepth = 25600;
 
     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
                              @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency) {
index bf17fbac7377fb2f2622c330890043c44fe9e1ae..9a84294e047390e346d980315e8dcb45dffd508f 100644 (file)
@@ -135,15 +135,19 @@ public abstract class CommonService {
         final RequestContext<T> requestContext = requestContextStack.createRequestContext();
         final SettableFuture<RpcResult<T>> result = requestContextStack.storeOrFail(requestContext);
         if (result.isDone()) {
-            messageSpy.spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_DISREGARDED);
             LOG.trace("Request context refused.");
             return result;
         }
 
-        final Long reservedXid = deviceContext.getReservedXid();
+        Long reservedXid = deviceContext.getReservedXid();
         if (null == reservedXid) {
-            RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
-            return result;
+            //retry
+            reservedXid = deviceContext.getReservedXid();
+            if (null == reservedXid) {
+                RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
+                deviceContext.getMessageSpy().spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.RESERVATION_REJECTED);
+                return result;
+            }
         }
         final Xid xid = new Xid(reservedXid);
         requestContext.setXid(xid);
index 7d5538a6c6a5b53c2f4ea80a928a99e7ff063b81..efabd5bf1f083c76af8567978a3ca470e45c1d37 100644 (file)
@@ -8,19 +8,18 @@
 package org.opendaylight.openflowplugin.impl.services;
 
 import com.google.common.base.Function;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.ListenableFuture;
-import java.math.BigInteger;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.concurrent.Future;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.PacketOutConvertor;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.ConnectionCookie;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.PacketProcessingService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.TransmitPacketInput;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -42,23 +41,29 @@ public class PacketProcessingServiceImpl extends CommonService implements Packet
                 final PacketOutInput message = PacketOutConvertor.toPacketOutInput(input, getVersion(), xid.getValue(),
                         getDatapathId());
 
-                BigInteger connectionID = getPrimaryConnection();
-                final ConnectionCookie connectionCookie = input.getConnectionCookie();
-                if (connectionCookie != null && connectionCookie.getValue() != null) {
-                    connectionID = BigInteger.valueOf(connectionCookie.getValue());
-                }
+                final OutboundQueue outboundQueue = getDeviceContext().getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
 
-                ListenableFuture<RpcResult<Void>> rpcResultListenableFuture = JdkFutureAdapters.listenInPoolThread(provideConnectionAdapter(connectionID).packetOut(message));
-                return Futures.transform(rpcResultListenableFuture, new AsyncFunction<RpcResult<Void>, RpcResult<Void>>() {
+                final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
+
+                outboundQueue.commitEntry(xid.getValue(), message, new FutureCallback<OfHeader>() {
                     @Override
-                    public ListenableFuture<RpcResult<Void>> apply(RpcResult<Void> rpcResult) throws Exception {
-                        if (! rpcResult.isSuccessful()) {
-                            return Futures.immediateFuture(rpcResult);
-                        } else {
-                            return Futures.immediateCancelledFuture();
+                    public void onSuccess(final OfHeader ofHeader) {
+                        if (ofHeader instanceof RpcResult) {
+                            RpcResult rpcResult = (RpcResult) ofHeader;
+                            if (!rpcResult.isSuccessful()) {
+                                settableFuture.set(rpcResult);
+                            } else {
+                                settableFuture.cancel(true);
+                            }
                         }
                     }
+
+                    @Override
+                    public void onFailure(final Throwable throwable) {
+                        settableFuture.cancel(true);
+                    }
                 });
+                return settableFuture;
             }
         });
 
index ac7fd30f10156835781247a4137fcfbeb0fa8d2c..616c40550ad0d4f407be6f01853683482b457446 100644 (file)
@@ -47,10 +47,13 @@ public class SalEchoServiceImpl extends CommonService implements SalEchoService
                 .storeOrFail(requestContext);
         if (!sendEchoOutput.isDone()) {
             final DeviceContext deviceContext = getDeviceContext();
-            final Long reserverXid = deviceContext.getReservedXid();
-            if (null == reserverXid){
-                RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
-                return sendEchoOutput;
+            Long reserverXid = deviceContext.getReservedXid();
+            if (null == reserverXid) {
+                if (null == reserverXid) {
+                    reserverXid = deviceContext.getReservedXid();
+                    RequestContextUtil.closeRequestContextWithRpcError(requestContext, "Outbound queue wasn't able to reserve XID.");
+                    return sendEchoOutput;
+                }
             }
             final Xid xid = new Xid(reserverXid);
             requestContext.setXid(xid);
index 5d12bc2fdfa4c1f6c6f334725603d7c8758c9b10..32170146770ef19a64682426c8c23e03c185d6a6 100644 (file)
@@ -76,14 +76,8 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService
             FlowId flowId = null;
             @Override
             public void onSuccess(final RpcResult<AddFlowOutput> rpcResult) {
-                if (null != input.getFlowRef()) {
-                    flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
-                    final FlowDescriptor flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
-                    deviceContext.getDeviceFlowRegistry().store(flowHash, flowDescriptor);
-                } else {
-                    flowId = getDeviceContext().getDeviceFlowRegistry().storeIfNecessary(flowHash, input.getTableId());
-                }
                 if (rpcResult.isSuccessful()) {
+                    getMessageSpy().spyMessage(FlowModInput.class, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
                     LOG.debug("flow add finished without error, id={}", flowId.getValue());
                 } else {
                     LOG.debug("flow add failed with error, id={}", flowId.getValue());
@@ -93,7 +87,8 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService
             @Override
             public void onFailure(final Throwable throwable) {
                 deviceContext.getDeviceFlowRegistry().markToBeremoved(flowHash);
-                LOG.trace("Service call for adding flows failed, hash id={}.", flowHash, throwable);
+                getMessageSpy().spyMessage(FlowModInput.class, MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
+                LOG.trace("Service call for adding flows failed, id={}.", flowId.getValue(), throwable);
             }
         });
 
@@ -305,14 +300,12 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService
             @Override
             public void onSuccess(final OfHeader ofHeader) {
                 settableFuture.set(RpcResultBuilder.<Void>success().build());
-                getMessageSpy().spyMessage(flowModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
             }
 
             @Override
             public void onFailure(final Throwable throwable) {
                 RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, throwable.getMessage());
                 settableFuture.set(rpcResultBuilder.build());
-                getMessageSpy().spyMessage(flowModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
             }
         });
         return settableFuture;
index 5689c942042b7f686a0f6d62c6c8553dc51abf61..0c4a4b8885c7f8ab8a1e4541413e0f29589e6f48 100644 (file)
@@ -60,31 +60,29 @@ public class StatisticsGatheringService extends CommonService {
                                                          type);
                                          final OutboundQueue outboundQueue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
                                          final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
-                                         synchronized (outboundQueue) {
-                                             outboundQueue.commitEntry(xid.getValue(), multipartRequestInput, new FutureCallback<OfHeader>() {
-                                                 @Override
-                                                 public void onSuccess(final OfHeader ofHeader) {
-                                                     if (ofHeader instanceof MultipartReply) {
-                                                         final MultipartReply multipartReply = (MultipartReply) ofHeader;
-                                                         settableFuture.set(RpcResultBuilder.<Void>success().build());
-                                                         multiMsgCollector.addMultipartMsg(multipartReply);
+                                         outboundQueue.commitEntry(xid.getValue(), multipartRequestInput, new FutureCallback<OfHeader>() {
+                                             @Override
+                                             public void onSuccess(final OfHeader ofHeader) {
+                                                 if (ofHeader instanceof MultipartReply) {
+                                                     final MultipartReply multipartReply = (MultipartReply) ofHeader;
+                                                     settableFuture.set(RpcResultBuilder.<Void>success().build());
+                                                     multiMsgCollector.addMultipartMsg(multipartReply);
+                                                 } else {
+                                                     if (null != ofHeader) {
+                                                         LOG.info("Unexpected response type received {}.", ofHeader.getClass());
                                                      } else {
-                                                         if (null != ofHeader) {
-                                                             LOG.info("Unexpected response type received {}.", ofHeader.getClass());
-                                                         } else {
-                                                             LOG.info("Response received is null.");
-                                                         }
+                                                         LOG.info("Unexpected response type received {}.", ofHeader.getClass());
                                                      }
-
                                                  }
 
-                                                 @Override
-                                                 public void onFailure(final Throwable throwable) {
-                                                     RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage());
-                                                     settableFuture.set(rpcResultBuilder.build());
-                                                 }
-                                             });
-                                         }
+                                             }
+
+                                             @Override
+                                             public void onFailure(final Throwable throwable) {
+                                                 RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage());
+                                                 settableFuture.set(rpcResultBuilder.build());
+                                             }
+                                         });
                                          return settableFuture;
                                      }
                                  }