Introduced OutboundQueue from OFJ for outbound messages processing 73/20173/8
authorMartin Bobak <mbobak@cisco.com>
Tue, 12 May 2015 18:17:37 +0000 (20:17 +0200)
committerMartin Bobak <mbobak@cisco.com>
Fri, 15 May 2015 15:52:47 +0000 (17:52 +0200)
BarrierProcessor and related classes are removed
Device static info gathered trough OutboundQueue
Outbound queue used in CommonService and SalFlowService

Change-Id: Ieeff15184aac44cdfd0a902b2f3ae89bfa149461
Signed-off-by: Martin Bobak <mbobak@cisco.com>
14 files changed:
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/ConnectionContext.java
openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/OutboundQueueProvider.java [new file with mode: 0644]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/ConnectionManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/OutboundQueueProviderImpl.java [new file with mode: 0644]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/BarrierProcessor.java [deleted file]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/BarrierTaskBuilder.java [deleted file]
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceContextImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/DeviceManagerImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/CommonService.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/SalFlowServiceImpl.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/statistics/services/dedicated/StatisticsGatheringService.java
openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/util/MdSalRegistratorUtils.java
openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/BarrierProcessorTest.java [deleted file]

index 025fc8bebc592e9879140671a59012b717496334..18372d0dff3a0320bc3981e55b148fb6a3f41726 100644 (file)
@@ -63,6 +63,17 @@ public interface ConnectionContext {
      */
     ConnectionAdapter getConnectionAdapter();
 
+    /**
+     * Returns reference to OFJava outbound queue provider. Outbound queue is used for outbound messages processing.
+     *
+     * @return
+     */
+    OutboundQueueProvider getOutboundQueueProvider();
+    /**
+     * Method sets reference to OFJava outbound queue provider.
+     *
+     */
+    void setOutboundQueueProvider(OutboundQueueProvider  outboundQueueProvider);
 
     /**
      * Method returns current connection state.
diff --git a/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/OutboundQueueProvider.java b/openflowplugin-api/src/main/java/org/opendaylight/openflowplugin/api/openflow/connection/OutboundQueueProvider.java
new file mode 100644 (file)
index 0000000..6492f09
--- /dev/null
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.connection;
+
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
+
+/**
+ * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 12.5.2015.
+ */
+public interface OutboundQueueProvider extends OutboundQueueHandler {
+
+    OutboundQueue getOutboundQueue();
+}
index 14b787960bb4604c1d465840378487d2eef3f2b6..6fc6f1040bd17d935cfa9f19f64ffdf5527c170d 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.openflowplugin.impl.connection;
 
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
 import org.opendaylight.openflowplugin.api.openflow.device.handlers.DeviceDisconnectedHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
@@ -26,6 +27,7 @@ public class ConnectionContextImpl implements ConnectionContext {
     private NodeId nodeId;
     private DeviceDisconnectedHandler deviceDisconnectedHandler;
     private static final Logger LOG = LoggerFactory.getLogger(ConnectionContextImpl.class);
+    private OutboundQueueProvider outboundQueueProvider;
 
     /**
      * @param connectionAdapter
@@ -39,6 +41,16 @@ public class ConnectionContextImpl implements ConnectionContext {
         return connectionAdapter;
     }
 
+    @Override
+    public OutboundQueueProvider getOutboundQueueProvider() {
+        return this.outboundQueueProvider;
+    }
+
+    @Override
+    public void setOutboundQueueProvider(final OutboundQueueProvider outboundQueueProvider) {
+        this.outboundQueueProvider = outboundQueueProvider;
+    }
+
     @Override
     public CONNECTION_STATE getConnectionState() {
         return connectionState;
index 4f8de11aa4f952223345faab53ca790c9d145b0d..ea2e556bc0bb60fe60fb199b28d6912d4599b94d 100644 (file)
@@ -12,6 +12,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionManager;
 import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
@@ -44,6 +45,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
 
     @Override
     public void onSwitchConnected(final ConnectionAdapter connectionAdapter) {
+
         LOG.trace("preparing handshake: {}", connectionAdapter.getRemoteAddress());
 
         final int handshakeThreadLimit = 1; //TODO: move to constants/parametrize
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/OutboundQueueProviderImpl.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/connection/OutboundQueueProviderImpl.java
new file mode 100644 (file)
index 0000000..50560a4
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.connection;
+
+import javax.annotation.Nonnull;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
+
+/**
+ * Created by Martin Bobak &lt;mbobak@cisco.com&gt; on 12.5.2015.
+ */
+public class OutboundQueueProviderImpl implements OutboundQueueProvider {
+
+    private OutboundQueue outboundQueue;
+    private final short ofVersion;
+
+    public OutboundQueueProviderImpl(final short ofVersion) {
+        this.ofVersion = ofVersion;
+    }
+
+    @Nonnull
+    @Override
+    public BarrierInput createBarrierRequest(@Nonnull final Long xid) {
+        final BarrierInputBuilder biBuilder = new BarrierInputBuilder();
+        biBuilder.setVersion(ofVersion);
+        biBuilder.setXid(xid);
+        return biBuilder.build();
+
+    }
+
+    @Override
+    public void onConnectionQueueChanged(final OutboundQueue outboundQueue) {
+        this.outboundQueue = outboundQueue;
+    }
+
+    @Override
+    public OutboundQueue getOutboundQueue() {
+        return this.outboundQueue;
+    }
+}
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/BarrierProcessor.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/BarrierProcessor.java
deleted file mode 100644 (file)
index 3df81ee..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.impl.device;
-
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.OutstandingMessageExtractor;
-import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * openflowplugin-impl
- * org.opendaylight.openflowplugin.impl.device
- *
- *
- *
- * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- *
- * Created: Apr 3, 2015
- */
-public class BarrierProcessor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(BarrierProcessor.class);
-
-    /**
-     * for all requestContexts from deviceContext cache which are older than barrier (lower barrierXid value) we do: <br>
-     *     <ul>
-     *         <li>remove from cache</li>
-     *         <li>cancel inner future</li>
-     *     </ul>
-     *
-     * @param barrierXid
-     * @param messageExtractor
-     */
-    public static void processOutstandingRequests(final long barrierXid, final OutstandingMessageExtractor messageExtractor) {
-        LOG.trace("processing barrier response [{}]", barrierXid);
-        RequestContext nextRequestContext;
-        while ((nextRequestContext = messageExtractor.extractNextOutstandingMessage(barrierXid)) != null ) {
-            LOG.trace("flushing outstanding request [{}], closing", nextRequestContext.getXid().getValue());
-            nextRequestContext.getFuture().cancel(false);
-            RequestContextUtil.closeRequstContext(nextRequestContext);
-        }
-    }
-}
diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/BarrierTaskBuilder.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/device/BarrierTaskBuilder.java
deleted file mode 100644 (file)
index d2cba55..0000000
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.impl.device;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * openflowplugin-impl
- * org.opendaylight.openflowplugin.impl.device
- * Barrier message self restarting builder.
- *
- * @author <a href="mailto:vdemcak@cisco.com">Vaclav Demcak</a>
- *         Created: Apr 3, 2015
- */
-public class BarrierTaskBuilder {
-
-    private static final Logger LOG = LoggerFactory.getLogger(BarrierTaskBuilder.class);
-    public static final long DELAY = 500L;
-
-    private final DeviceContext deviceCtx;
-
-    public BarrierTaskBuilder(final DeviceContext deviceCtx) {
-        this.deviceCtx = Preconditions.checkNotNull(deviceCtx);
-        Preconditions.checkNotNull(deviceCtx.getTimer());
-    }
-
-    public void buildAndFireBarrierTask() {
-        Timeout timeout = deviceCtx.getTimer().newTimeout(new BarrierTask(deviceCtx), DELAY, TimeUnit.MILLISECONDS);
-        deviceCtx.setCurrentBarrierTimeout(timeout);
-    }
-
-
-    private final class BarrierTask implements TimerTask {
-
-        private final DeviceContext deviceCtx;
-
-        public BarrierTask(final DeviceContext deviceCtx) {
-            this.deviceCtx = deviceCtx;
-        }
-
-        /**
-         * @return OF-message, ready to send
-         */
-        private BarrierInput makeBarrier() {
-            final BarrierInputBuilder biBuilder = new BarrierInputBuilder();
-            biBuilder.setVersion(deviceCtx.getDeviceState().getVersion());
-            biBuilder.setXid(deviceCtx.getNextXid().getValue());
-            return biBuilder.build();
-        }
-
-        @Override
-        public void run(final Timeout timeout) throws Exception {
-            // check outstanding requests first
-            if (deviceCtx.getDeviceState().isValid()) {
-                if (deviceCtx.getNumberOfOutstandingRequests() > 0) {
-                    BarrierInput barrierInput = makeBarrier();
-                    LOG.trace("sending out barrier [{}]", barrierInput.getXid());
-                    final Future<RpcResult<BarrierOutput>> future = deviceCtx.getPrimaryConnectionContext()
-                            .getConnectionAdapter().barrier(barrierInput);
-                    final ListenableFuture<RpcResult<BarrierOutput>> lsFuture = JdkFutureAdapters.listenInPoolThread(future);
-                    Futures.addCallback(lsFuture, makeCallBack());
-                } else {
-                    // if no requests
-                    buildAndFireBarrierTask();
-                }
-            } else {
-                LOG.trace("DeviceContext is not valid, will not create next barrier task.");
-            }
-        }
-
-        private FutureCallback<RpcResult<BarrierOutput>> makeCallBack() {
-            return new FutureCallback<RpcResult<BarrierOutput>>() {
-                @Override
-                public void onSuccess(final RpcResult<BarrierOutput> result) {
-                    if (!result.isSuccessful()) {
-                        for (RpcError rpcError : result.getErrors()) {
-                            LOG.trace("Barrier response with error {}", rpcError, rpcError.getCause());
-                        }
-                    } else if (null != result.getResult().getXid()) {
-                        BarrierProcessor.processOutstandingRequests(result.getResult().getXid(), deviceCtx);
-                    }
-                    buildAndFireBarrierTask();
-                }
-
-                @Override
-                public void onFailure(final Throwable t) {
-                    LOG.info("Barrier has failed {} ", t.getMessage());
-                    LOG.trace("Barrier has failed", t);
-                }
-            };
-        }
-
-    }
-
-}
index 359e7493a5977dbacab805a49b2cfcb5772a6372..9c37bf14fc9e34045636ca432c2710888b818a9c 100644 (file)
@@ -117,7 +117,7 @@ public class DeviceContextImpl implements DeviceContext {
     private final Collection<DeviceContextClosedHandler> closeHandlers = new HashSet<>();
     private NotificationPublishService notificationPublishService;
     private final ThrottledNotificationsOfferer throttledConnectionsHolder;
-    private BlockingQueue<PacketInMessage> bumperQueue;
+    private BlockingQueue<PacketReceived> bumperQueue;
 
 
     @VisibleForTesting
@@ -139,7 +139,7 @@ public class DeviceContextImpl implements DeviceContext {
         deviceMeterRegistry = new DeviceMeterRegistryImpl();
         messageSpy = _messageSpy;
         this.throttledConnectionsHolder = throttledConnectionsHolder;
-        bumperQueue = new ArrayBlockingQueue<PacketInMessage>(5000);
+        bumperQueue = new ArrayBlockingQueue<>(5000);
     }
 
     /**
@@ -316,6 +316,7 @@ public class DeviceContextImpl implements DeviceContext {
                 messageSpy.spyMessage(multipartReply.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
             }
 
+            unhookRequestCtx(xid);
             try {
                 requestContext.close();
             } catch (final Exception e) {
@@ -403,7 +404,7 @@ public class DeviceContextImpl implements DeviceContext {
         }
 
         if (throttledConnectionsHolder.isThrottlingEffective(bumperQueue)) {
-            boolean caught = bumperQueue.offer(packetInMessage);
+            boolean caught = bumperQueue.offer(packetReceived);
             if (!caught) {
                 LOG.debug("ingress notification dropped - no place in bumper queue [{}]", connectionAdapter.getRemoteAddress());
             }
@@ -417,7 +418,7 @@ public class DeviceContextImpl implements DeviceContext {
                     LOG.trace("notification offer interrupted..", e);
                 } catch (ExecutionException e) {
                     if (e.getCause() instanceof NotificationRejectedException) {
-                        applyThrottling(packetInMessage, connectionAdapter);
+                        applyThrottling(packetReceived, connectionAdapter);
                     } else {
                         LOG.debug("notification offer failed: {}", e.getMessage());
                         LOG.trace("notification offer failed..", e);
@@ -429,17 +430,17 @@ public class DeviceContextImpl implements DeviceContext {
         }
     }
 
-    private void applyThrottling(PacketInMessage packetInMessage, final ConnectionAdapter connectionAdapter) {
+    private void applyThrottling(PacketReceived packetReceived, final ConnectionAdapter connectionAdapter) {
         final InetSocketAddress remoteAddress = connectionAdapter.getRemoteAddress();
         LOG.debug("Notification offer refused by notification service.");
-        messageSpy.spyMessage(packetInMessage.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
+        messageSpy.spyMessage(packetReceived.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
         connectionAdapter.setAutoRead(false);
 
         LOG.debug("Throttling ingress for {}", remoteAddress);
         final ListenableFuture<Void> queueDone;
 
         // adding first notification
-        bumperQueue.offer(packetInMessage);
+        bumperQueue.offer(packetReceived);
         synchronized (bumperQueue) {
             queueDone = throttledConnectionsHolder.applyThrottlingOnConnection(bumperQueue);
         }
index 37842db8bad7977065d1e45400c6d57251eecfa7..3991cdcb0e4b09d76dedc9f70204b1dce2e904c4 100644 (file)
@@ -30,9 +30,12 @@ import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService
 import org.opendaylight.controller.md.sal.binding.api.NotificationService;
 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandlerRegistration;
 import org.opendaylight.openflowplugin.api.ConnectionException;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
+import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
 import org.opendaylight.openflowplugin.api.openflow.connection.ThrottledNotificationsOfferer;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceManager;
@@ -48,6 +51,7 @@ import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageIntelligenceAgency;
 import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory;
 import org.opendaylight.openflowplugin.impl.common.NodeStaticReplyTranslatorUtil;
+import org.opendaylight.openflowplugin.impl.connection.OutboundQueueProviderImpl;
 import org.opendaylight.openflowplugin.impl.connection.ThrottledNotificationsOffererImpl;
 import org.opendaylight.openflowplugin.impl.device.listener.OpenflowProtocolListenerFullImpl;
 import org.opendaylight.openflowplugin.impl.rpc.RequestContextImpl;
@@ -75,6 +79,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev13
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortGrouping;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.MultipartReplyBody;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.multipart.reply.multipart.reply.body.MultipartReplyDescCase;
@@ -119,6 +124,9 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
     private final List<DeviceContext> deviceContexts = new ArrayList<DeviceContext>();
     private final MessageIntelligenceAgency messageIntelligenceAgency;
 
+    private final long barrierNanos = 500000000L;
+    private final int maxQueueDepth = 1024;
+
     public DeviceManagerImpl(@Nonnull final DataBroker dataBroker,
                              @Nonnull final MessageIntelligenceAgency messageIntelligenceAgency) {
         this.dataBroker = Preconditions.checkNotNull(dataBroker);
@@ -158,7 +166,6 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         // final phase - we have to add new Device to MD-SAL DataStore
         Preconditions.checkNotNull(deviceContext);
         ((DeviceContextImpl) deviceContext).submitTransaction();
-        new BarrierTaskBuilder(deviceContext).buildAndFireBarrierTask();
     }
 
     @Override
@@ -185,6 +192,11 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
         final ListenableFuture<List<RpcResult<List<MultipartReply>>>> deviceFeaturesFuture;
 
         final Short version = connectionContext.getFeatures().getVersion();
+
+        OutboundQueueProvider outboundQueueProvider = new OutboundQueueProviderImpl(version);
+        connectionContext.getConnectionAdapter().registerOutboundQueueHandler(outboundQueueProvider, maxQueueDepth, barrierNanos);
+        connectionContext.setOutboundQueueProvider(outboundQueueProvider);
+
         if (OFConstants.OFP_VERSION_1_0 == version) {
             final CapabilitiesV10 capabilitiesV10 = connectionContext.getFeatures().getCapabilitiesV10();
 
@@ -309,16 +321,37 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
     private ListenableFuture<RpcResult<List<MultipartReply>>> getNodeStaticInfo(final MultiMsgCollector multiMsgCollector, final MultipartType type, final DeviceContext deviceContext,
                                                                                 final InstanceIdentifier<Node> nodeII, final short version) {
 
-        final Xid xid = deviceContext.getNextXid();
+        final OutboundQueue outboundQueue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
+
+        long reservedXid;
+        synchronized (outboundQueue) {
+            reservedXid = outboundQueue.reserveEntry();
+        }
+        final Xid xid = new Xid(reservedXid);
+
         final RequestContext<List<MultipartReply>> requestContext = emptyRequestContextStack.createRequestContext();
         requestContext.setXid(xid);
 
         LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
         deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
 
+        final ListenableFuture<RpcResult<List<MultipartReply>>> requestContextFuture = requestContext.getFuture();
 
         multiMsgCollector.registerMultipartXid(xid.getValue());
-        Futures.addCallback(requestContext.getFuture(), new FutureCallback<RpcResult<List<MultipartReply>>>() {
+        outboundQueue.commitEntry(reservedXid, MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type), new FutureCallback<OfHeader>() {
+            @Override
+            public void onSuccess(final OfHeader ofHeader) {
+                LOG.info("Static node {} info: {} collected", type);
+            }
+
+
+            @Override
+            public void onFailure(final Throwable t) {
+                LOG.info("Failed to retrieve static node {} info: {}", type, t.getMessage());
+            }
+        });
+
+        Futures.addCallback(requestContextFuture, new FutureCallback<RpcResult<List<MultipartReply>>>() {
             @Override
             public void onSuccess(final RpcResult<List<MultipartReply>> rpcResult) {
                 final List<MultipartReply> result = rpcResult.getResult();
@@ -328,7 +361,7 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
                     final Iterator<RpcError> rpcErrorIterator = rpcResult.getErrors().iterator();
                     while (rpcErrorIterator.hasNext()) {
                         final RpcError rpcError = rpcErrorIterator.next();
-                        LOG.info("Failed to retrieve static node {} info: {}", type, rpcError.getMessage());
+                            LOG.info("Failed to retrieve static node {} info: {}", type, rpcError.getMessage());
                         if (null != rpcError.getCause()) {
                             LOG.trace("Detailed error:", rpcError.getCause());
                         }
@@ -340,15 +373,18 @@ public class DeviceManagerImpl implements DeviceManager, AutoCloseable {
             }
 
             @Override
-            public void onFailure(final Throwable t) {
-                LOG.info("Failed to retrieve static node {} info: {}", type, t.getMessage());
+            public void onFailure(final Throwable throwable) {
+
             }
         });
 
+
+/*
         final ListenableFuture<RpcResult<Void>> rpcFuture = JdkFutureAdapters.listenInPoolThread(deviceContext.getPrimaryConnectionContext().getConnectionAdapter()
                 .multipartRequest(MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), version, type)));
         final OFJResult2RequestCtxFuture OFJResult2RequestCtxFuture = new OFJResult2RequestCtxFuture(requestContext, deviceContext);
         OFJResult2RequestCtxFuture.processResultFromOfJava(rpcFuture);
+*/
 
         return requestContext.getFuture();
     }
index 2579b2f26bce5003b138356802b29e5a607cde59..b6e03c317727b0f2fa818c3575034371a04baa6b 100644 (file)
@@ -14,10 +14,12 @@ import com.google.common.util.concurrent.SettableFuture;
 import java.math.BigInteger;
 import java.util.concurrent.Future;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
 import org.opendaylight.openflowplugin.api.openflow.connection.ConnectionContext;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
+import org.opendaylight.openflowplugin.api.openflow.device.Xid;
 import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FeaturesReply;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
@@ -51,11 +53,12 @@ public abstract class CommonService {
         this.primaryConnectionAdapter = deviceContext.getPrimaryConnectionContext().getConnectionAdapter();
         this.messageSpy = deviceContext.getMessageSpy();
     }
+
     public static BigInteger getPrimaryConnection() {
         return PRIMARY_CONNECTION;
     }
 
-    public short getVersion(){
+    public short getVersion() {
         return version;
     }
 
@@ -111,23 +114,22 @@ public abstract class CommonService {
     public <T, F> ListenableFuture<RpcResult<T>> handleServiceCall(final BigInteger connectionID,
                                                                    final Function<DataCrate<T>, ListenableFuture<RpcResult<F>>> function) {
         DataCrateBuilder<T> dataCrateBuilder = DataCrateBuilder.<T>builder();
-        return handleServiceCall(connectionID, function, dataCrateBuilder);
+        return handleServiceCall(function, dataCrateBuilder);
     }
+
     public <T, F> ListenableFuture<RpcResult<T>> handleServiceCall(final Function<DataCrate<T>, ListenableFuture<RpcResult<F>>> function) {
         DataCrateBuilder<T> dataCrateBuilder = DataCrateBuilder.<T>builder();
-        return handleServiceCall(PRIMARY_CONNECTION, function, dataCrateBuilder);
+        return handleServiceCall(function, dataCrateBuilder);
     }
 
     /**
      * @param <T>
      * @param <F>
-     * @param connectionID
      * @param function
      * @param dataCrateBuilder predefined data
      * @return
      */
-    public final <T, F> ListenableFuture<RpcResult<T>> handleServiceCall(final BigInteger connectionID,
-                                                                         final Function<DataCrate<T>, ListenableFuture<RpcResult<F>>> function,
+    public final <T, F> ListenableFuture<RpcResult<T>> handleServiceCall(final Function<DataCrate<T>, ListenableFuture<RpcResult<F>>> function,
                                                                          final DataCrateBuilder<T> dataCrateBuilder) {
 
         LOG.trace("Handling general service call");
@@ -135,15 +137,23 @@ public abstract class CommonService {
         final SettableFuture<RpcResult<T>> result = requestContextStack.storeOrFail(requestContext);
         if (result.isDone()) {
             messageSpy.spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_DISREGARDED);
+            LOG.trace("Request context refused.");
             return result;
         }
-        DataCrate<T> dataCrate = dataCrateBuilder.setiDConnection(connectionID).setRequestContext(requestContext)
+
+        long reservedXid;
+        final OutboundQueue outboundQueue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
+        synchronized (outboundQueue) {
+            reservedXid = outboundQueue.reserveEntry();
+        }
+        final Xid xid = new Xid(reservedXid);
+        requestContext.setXid(xid);
+        DataCrate<T> dataCrate = dataCrateBuilder.setRequestContext(requestContext)
                 .build();
         final ListenableFuture<RpcResult<F>> resultFromOFLib;
 
-        requestContext.setXid(deviceContext.getNextXid());
         LOG.trace("Hooking xid {} to device context - precaution.", requestContext.getXid().getValue());
-        deviceContext.hookRequestCtx(requestContext.getXid(), requestContext);
+        deviceContext.hookRequestCtx(xid, requestContext);
 
         messageSpy.spyMessage(requestContext.getClass(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_READY_FOR_SUBMIT);
         synchronized (deviceContext) {
index a579205e41c0bce0e7f166e73456df34714a628e..5d12bc2fdfa4c1f6c6f334725603d7c8758c9b10 100644 (file)
@@ -10,7 +10,6 @@ package org.opendaylight.openflowplugin.impl.services;
 import com.google.common.base.Function;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.ArrayList;
@@ -19,6 +18,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
@@ -47,6 +47,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.flow.update.UpdatedFlow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -192,14 +193,13 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService
         return future;
     }
 
-    private <T> ListenableFuture<RpcResult<T>> processFlowModInputBuilders(
-            final List<FlowModInputBuilder> ofFlowModInputs) {
+    private <T> ListenableFuture<RpcResult<T>> processFlowModInputBuilders(final List<FlowModInputBuilder> ofFlowModInputs) {
+
         final List<ListenableFuture<RpcResult<T>>> partialFutures = new ArrayList<>();
 
         for (FlowModInputBuilder flowModInputBuilder : ofFlowModInputs) {
             DataCrateBuilder<T> dataCrateBuilder = DataCrateBuilder.<T>builder().setFlowModInputBuilder(flowModInputBuilder);
             ListenableFuture<RpcResult<T>> partialFuture = handleServiceCall(
-                    getPrimaryConnection(),
                     new Function<DataCrate<T>, ListenableFuture<RpcResult<Void>>>() {
                         @Override
                         public ListenableFuture<RpcResult<Void>> apply(final DataCrate<T> data) {
@@ -295,14 +295,27 @@ public class SalFlowServiceImpl extends CommonService implements SalFlowService
     }
 
     protected <T> ListenableFuture<RpcResult<Void>> createResultForFlowMod(final DataCrate<T> data, final FlowModInputBuilder flowModInputBuilder) {
-        final Xid xid = data.getRequestContext().getXid();
-        flowModInputBuilder.setXid(xid.getValue());
+        final OutboundQueue outboundQueue = getDeviceContext().getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
+        long xid = data.getRequestContext().getXid().getValue();
+        flowModInputBuilder.setXid(xid);
         final FlowModInput flowModInput = flowModInputBuilder.build();
-        Future<RpcResult<Void>> flowModResult = provideConnectionAdapter(data.getiDConnection()).flowMod(
-                flowModInput);
 
-        final ListenableFuture<RpcResult<Void>> result = JdkFutureAdapters.listenInPoolThread(flowModResult);
-        return result;
+        final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
+        outboundQueue.commitEntry(xid, flowModInput, new FutureCallback<OfHeader>() {
+            @Override
+            public void onSuccess(final OfHeader ofHeader) {
+                settableFuture.set(RpcResultBuilder.<Void>success().build());
+                getMessageSpy().spyMessage(flowModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_SUCCESS);
+            }
+
+            @Override
+            public void onFailure(final Throwable throwable) {
+                RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(ErrorType.APPLICATION, throwable.getMessage());
+                settableFuture.set(rpcResultBuilder.build());
+                getMessageSpy().spyMessage(flowModInput.getImplementedInterface(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMIT_FAILURE);
+            }
+        });
+        return settableFuture;
     }
 
 }
index 7f1c48d2a5e5ae6ca92a2311356ba3ae30a93c99..f2d0a738f60f2b55ad23f8e546084bf982efd2a9 100644 (file)
@@ -9,10 +9,14 @@
 package org.opendaylight.openflowplugin.impl.statistics.services.dedicated;
 
 import com.google.common.base.Function;
+import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.JdkFutureAdapters;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import java.util.List;
 import java.util.concurrent.Future;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
+import org.opendaylight.openflowplugin.api.openflow.connection.OutboundQueueProvider;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
 import org.opendaylight.openflowplugin.api.openflow.device.Xid;
@@ -22,7 +26,10 @@ import org.opendaylight.openflowplugin.impl.services.DataCrate;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,9 +57,23 @@ public class StatisticsGatheringService extends CommonService {
                                 makeMultipartRequestInput(xid.getValue(),
                                         getVersion(),
                                         type);
-                        final Future<RpcResult<Void>> resultFromOFLib = deviceContext.getPrimaryConnectionContext()
-                                .getConnectionAdapter().multipartRequest(multipartRequestInput);
-                        return JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
+                        final OutboundQueue outboundQueue = deviceContext.getPrimaryConnectionContext().getOutboundQueueProvider().getOutboundQueue();
+                        final SettableFuture<RpcResult<Void>> settableFuture = SettableFuture.create();
+                        synchronized (outboundQueue){
+                            outboundQueue.commitEntry(xid.getValue(), multipartRequestInput, new FutureCallback<OfHeader>() {
+                                @Override
+                                public void onSuccess(final OfHeader ofHeader) {
+                                    settableFuture.set(RpcResultBuilder.<Void>success().build());
+                                }
+
+                                @Override
+                                public void onFailure(final Throwable throwable) {
+                                    RpcResultBuilder rpcResultBuilder = RpcResultBuilder.<Void>failed().withError(RpcError.ErrorType.APPLICATION, throwable.getMessage());
+                                    settableFuture.set(rpcResultBuilder.build());
+                                }
+                            });
+                        }
+                        return settableFuture;
                     }
                 }
 
index e991e356863f4b54a0abf511dc7399ec114ae148..a437bd517d6e2e92896b3df470e1a2f6caf2f124 100644 (file)
@@ -7,22 +7,21 @@
  */
 package org.opendaylight.openflowplugin.impl.util;
 
-import org.opendaylight.yang.gen.v1.urn.opendaylight.echo.service.rev150305.SalEchoService;
-import org.opendaylight.openflowplugin.impl.services.FlowCapableTransactionServiceImpl;
-
-import org.opendaylight.openflowplugin.impl.services.SalEchoServiceImpl;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
 import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
 import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
+import org.opendaylight.openflowplugin.impl.services.FlowCapableTransactionServiceImpl;
 import org.opendaylight.openflowplugin.impl.services.NodeConfigServiceImpl;
 import org.opendaylight.openflowplugin.impl.services.PacketProcessingServiceImpl;
+import org.opendaylight.openflowplugin.impl.services.SalEchoServiceImpl;
 import org.opendaylight.openflowplugin.impl.services.SalFlowServiceImpl;
 import org.opendaylight.openflowplugin.impl.services.SalGroupServiceImpl;
 import org.opendaylight.openflowplugin.impl.services.SalMeterServiceImpl;
 import org.opendaylight.openflowplugin.impl.services.SalTableServiceImpl;
 import org.opendaylight.openflowplugin.impl.statistics.services.OpendaylightFlowStatisticsServiceImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.echo.service.rev150305.SalEchoService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.OpendaylightFlowStatisticsService;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.transaction.rev150304.FlowCapableTransactionService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.module.config.rev141015.NodeConfigService;
diff --git a/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/BarrierProcessorTest.java b/openflowplugin-impl/src/test/java/org/opendaylight/openflowplugin/impl/device/BarrierProcessorTest.java
deleted file mode 100644 (file)
index a8b4c2f..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.openflowplugin.impl.device;
-
-import com.google.common.util.concurrent.SettableFuture;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.device.Xid;
-import org.opendaylight.openflowplugin.api.openflow.device.handlers.OutstandingMessageExtractor;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-
-/**
- * Created by mirehak on 4/5/15.
- */
-@RunWith(MockitoJUnitRunner.class)
-public class BarrierProcessorTest {
-
-    private static final long XID = 42L;
-    @Mock
-    private OutstandingMessageExtractor messageExtractor;
-    @Mock
-    private RequestContext<String> extractedReqCtx;
-
-    private SettableFuture<RpcResult<String>> settableFuture;
-
-    @Before
-    public void setUp() throws Exception {
-        settableFuture = SettableFuture.create();
-        Mockito.when(messageExtractor.extractNextOutstandingMessage(Matchers.anyLong()))
-                .thenReturn(extractedReqCtx, extractedReqCtx, null);
-        Mockito.when(extractedReqCtx.getFuture()).thenReturn(settableFuture);
-        Mockito.when(extractedReqCtx.getXid()).thenReturn(new Xid(41L), new Xid(42L));
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        Mockito.verifyNoMoreInteractions(messageExtractor);
-    }
-
-    @Test
-    public void testProcessOutstandingRequests() throws Exception {
-        BarrierProcessor.processOutstandingRequests(XID, messageExtractor);
-
-        Mockito.verify(messageExtractor, Mockito.times(3)).extractNextOutstandingMessage(XID);
-        Assert.assertTrue(settableFuture.isCancelled());
-    }
-}
\ No newline at end of file