BUG-542 - adding overall statictics 24/7124/1
authorMichal Rehak <mirehak@cisco.com>
Fri, 16 May 2014 19:11:08 +0000 (21:11 +0200)
committerMichal Rehak <mirehak@cisco.com>
Fri, 16 May 2014 21:55:39 +0000 (23:55 +0200)
- temporary extended messageSpy code
- added logging to rpc threadPool - afterExecution
- cleaned imports
- deprecated hard coded configurations for OFJava SwitchConnectionProvider
- simplified signature of MessageSpy
- added messageSpy and rpcPool into SessionManager

Change-Id: If41e33c885dab0907fdf642ad1f483d93f74b19c
Signed-off-by: Michal Rehak <mirehak@cisco.com>
20 files changed:
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConfigurationFactory.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/NotificationPopListener.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/RpcListeningExecutorService.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/SwitchConnectionHandlerImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ThreadPoolLoggingExecutor.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskContext.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OpenflowPluginProvider.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageObservatory.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpy.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpyCounterImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisher.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactory.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java

index 12341d757332659ee99e7c61af6bae691462c965..4436bbcb01ab4f6eae86a97752d2681b9e04bc9f 100644 (file)
@@ -13,8 +13,9 @@ import java.net.InetAddress;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
 
 /**
- * @author mirehak
+ * @deprecated use configSubsystem
  */
+@Deprecated
 public abstract class ConnectionConfigurationFactory {
 
     /** OF default listening port */
index 147bb60ed6fa330edf43288abbe23811024ef1b7..0dc084e258fd047c3cb4c9b29a8bb249ef6e773c 100644 (file)
@@ -18,6 +18,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -81,6 +82,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.Tr
 import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.QueueStatisticsUpdate;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.TableUpdated;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,6 +90,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
 /**
@@ -101,7 +104,7 @@ public class MDController implements IMDController, AutoCloseable {
 
     private ConcurrentMap<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> messageTranslators;
     private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListeners;
-    private MessageSpy<OfHeader, DataObject> messageSpyCounter; 
+    private MessageSpy<DataContainer> messageSpyCounter; 
 
     final private int OF10 = OFConstants.OFP_VERSION_1_0;
     final private int OF13 = OFConstants.OFP_VERSION_1_3;
@@ -140,8 +143,12 @@ public class MDController implements IMDController, AutoCloseable {
         addMessageTranslator(MultipartReplyMessage.class,OF13,new MultipartReplyTableFeaturesToTableUpdatedTranslator());
         addMessageTranslator(GetFeaturesOutput.class,OF10, new FeaturesV10ToNodeConnectorUpdatedTranslator());
 
-        //TODO: move registration to factory
         NotificationPopListener<DataObject> notificationPopListener = new NotificationPopListener<DataObject>();
+        notificationPopListener.setNotificationProviderService(
+                OFSessionUtil.getSessionManager().getNotificationProviderService());
+        notificationPopListener.setMessageSpy(messageSpyCounter);
+        
+        //TODO: move registration to factory
         addMessagePopListener(NodeErrorNotification.class, notificationPopListener);
         addMessagePopListener(BadActionErrorNotification.class, notificationPopListener);
         addMessagePopListener(BadInstructionErrorNotification.class, notificationPopListener);
@@ -196,14 +203,30 @@ public class MDController implements IMDController, AutoCloseable {
         // Push the updated Listeners to Session Manager which will be then picked up by ConnectionConductor eventually
         OFSessionUtil.getSessionManager().setTranslatorMapping(messageTranslators);
         OFSessionUtil.getSessionManager().setPopListenerMapping(popListeners);
+        OFSessionUtil.getSessionManager().setMessageSpy(messageSpyCounter);
         
         // prepare worker pool for rpc
         // TODO: get size from configSubsystem
-        OFSessionUtil.getSessionManager().setRpcPool(
-                MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)));
+        int rpcThreadLimit = 10;
+        ListeningExecutorService rpcPoolDelegator = createRpcPoolSpyDecorated(rpcThreadLimit, messageSpyCounter);
+        OFSessionUtil.getSessionManager().setRpcPool(rpcPoolDelegator);
         
     }
 
+    /**
+     * @param rpcThreadLimit
+     * @param messageSpy 
+     * @return
+     */
+    private static ListeningExecutorService createRpcPoolSpyDecorated(int rpcThreadLimit, MessageSpy<DataContainer> messageSpy) {
+        ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L, 
+                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+        ListeningExecutorService listeningRpcPool = MoreExecutors.listeningDecorator(rpcPool);
+        RpcListeningExecutorService rpcPoolDecorated = new RpcListeningExecutorService(listeningRpcPool);
+        rpcPoolDecorated.setMessageSpy(messageSpy);
+        return rpcPoolDecorated;
+    }
+
     /**
      * @param switchConnectionProviders
      *            the switchConnectionProviders to set
@@ -241,7 +264,9 @@ public class MDController implements IMDController, AutoCloseable {
 
     /**
      * @return wished connections configurations
+     * @deprecated use configSubsystem
      */
+    @Deprecated
     private static Collection<ConnectionConfiguration> getConnectionConfiguration() {
         // TODO:: get config from state manager
         ConnectionConfiguration configuration = ConnectionConfigurationFactory.getDefault();
@@ -333,7 +358,7 @@ public class MDController implements IMDController, AutoCloseable {
      * @param messageSpyCounter the messageSpyCounter to set
      */
     public void setMessageSpyCounter(
-            MessageSpy<OfHeader, DataObject> messageSpyCounter) {
+            MessageSpy<DataContainer> messageSpyCounter) {
         this.messageSpyCounter = messageSpyCounter;
     }
     
index 6d07186889fbcb4b6174c1812e16e6396fadb0bc..bd7462d994b18589f61d1f4d7516732a9eb8cda2 100644 (file)
@@ -8,19 +8,48 @@
 package org.opendaylight.openflowplugin.openflow.md.core;
 
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
 import org.opendaylight.yangtools.yang.binding.Notification;
 
+/**
+ * general publisher to MD-SAL 
+ * 
+ * @param <T> type of supported notification
+ */
 public class NotificationPopListener<T> implements PopListener<T> {
+    
+    private MessageSpy<? super T> messageSpy;
+    private NotificationProviderService notificationProviderService;
+    
+    /**
+     * @param messageSpy the messageSpy to set
+     */
+    public void setMessageSpy(MessageSpy<? super T> messageSpy) {
+        this.messageSpy = messageSpy;
+    }
 
+    /**
+     * @param notificationProviderService the notificationProviderService to set
+     */
+    public void setNotificationProviderService(
+            NotificationProviderService notificationProviderService) {
+        this.notificationProviderService = notificationProviderService;
+    }
 
     @Override
     public void onPop(T processedMessage) {
+        boolean published = false;
         if(processedMessage instanceof Notification) {
-            //TODO: create via factory, inject service
-            NotificationProviderService notificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService();
-            notificationProviderService.publish((Notification) processedMessage);
+            if (notificationProviderService != null) {
+                notificationProviderService.publish((Notification) processedMessage);
+                messageSpy.spyMessage(processedMessage, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+                published = true;
+            }
+        }
+        
+        if (! published) {
+            messageSpy.spyMessage(processedMessage, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
         }
     }
 
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/RpcListeningExecutorService.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/RpcListeningExecutorService.java
new file mode 100644 (file)
index 0000000..23c561c
--- /dev/null
@@ -0,0 +1,142 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTask;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+/**
+ * 
+ */
+public class RpcListeningExecutorService implements ListeningExecutorService {
+    
+    private MessageSpy<DataContainer> messageSpy;
+    private ListeningExecutorService executorServiceDelegate;
+    private DataContainer notSupportedTask = new NoDataContainerTask();
+    
+    /**
+     * @param executorService
+     */
+    public RpcListeningExecutorService(ListeningExecutorService executorService) {
+        this.executorServiceDelegate = executorService;
+    }
+    
+    /**
+     * @param messageSpy the messageSpy to set
+     */
+    public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+        this.messageSpy = messageSpy;
+    }
+    
+    @Override
+    public void shutdown() {
+        executorServiceDelegate.shutdown();
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Callable<T> task) {
+        ListenableFuture<T> resultFuture = executorServiceDelegate.submit(task);
+        
+        boolean covered = false;
+        if (task instanceof OFRpcTask<?, ?>) {
+            if (((OFRpcTask<?, ?>) task).getInput() instanceof DataContainer) {
+                messageSpy.spyMessage((DataContainer) ((OFRpcTask<?, ?>) task).getInput(), 
+                        MessageSpy.STATISTIC_GROUP.TO_SWITCH_ENQUEUED_SUCCESS);
+                covered = true;
+            }
+        } 
+        
+        if (! covered) {
+            messageSpy.spyMessage(notSupportedTask, MessageSpy.STATISTIC_GROUP.TO_SWITCH_ENQUEUED_FAILED);
+        }
+        
+        return resultFuture;
+    }
+
+    @Override
+    public ListenableFuture<?> submit(Runnable task) {
+        throw new IllegalAccessError("not supported");
+    }
+
+    @Override
+    public <T> ListenableFuture<T> submit(Runnable task, T result) {
+        throw new IllegalAccessError("not supported");
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+            throws InterruptedException {
+        return executorServiceDelegate.invokeAll(tasks);
+    }
+
+    @Override
+    public <T> List<Future<T>> invokeAll(
+            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+            throws InterruptedException {
+        return executorServiceDelegate.invokeAll(tasks, timeout, unit);
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        throw new IllegalAccessError("not supported");
+    }
+
+    @Override
+    public List<Runnable> shutdownNow() {
+        return executorServiceDelegate.shutdownNow();
+    }
+
+    @Override
+    public boolean isShutdown() {
+        return executorServiceDelegate.isShutdown();
+    }
+
+    @Override
+    public boolean isTerminated() {
+        return executorServiceDelegate.isTerminated();
+    }
+
+    @Override
+    public boolean awaitTermination(long timeout, TimeUnit unit)
+            throws InterruptedException {
+        return executorServiceDelegate.awaitTermination(timeout, unit);
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+            throws InterruptedException, ExecutionException {
+        throw new IllegalAccessError("not supported");
+    }
+
+    @Override
+    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
+            long timeout, TimeUnit unit) throws InterruptedException,
+            ExecutionException, TimeoutException {
+        throw new IllegalAccessError("not supported");
+    }
+    
+    protected static class NoDataContainerTask implements DataContainer {
+        @Override
+        public Class<? extends DataContainer> getImplementedInterface() {
+            return null;
+        }
+    }
+
+}
index 294a75a7caaaea0ac4ec957251a949cec1d8cdf2..50d3bee271044d4f7c4f0e4e2db329b093460020 100644 (file)
@@ -17,8 +17,7 @@ import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHan
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 
 /**
  * basic interconnecting piece between plugin and library 
@@ -29,7 +28,7 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler {
 
     private QueueKeeperLightImpl queueKeeper;
     private ErrorHandler errorHandler;
-    private MessageSpy<OfHeader, DataObject> messageSpy;
+    private MessageSpy<DataContainer> messageSpy;
     private int spyRate = 10;
 
     /**
@@ -71,7 +70,7 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler {
     /**
      * @param messageSpy the messageSpy to set
      */
-    public void setMessageSpy(MessageSpy<OfHeader, DataObject> messageSpy) {
+    public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
         this.messageSpy = messageSpy;
     }
     
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ThreadPoolLoggingExecutor.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ThreadPoolLoggingExecutor.java
new file mode 100644 (file)
index 0000000..448f5ec
--- /dev/null
@@ -0,0 +1,44 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * 
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ */
+public class ThreadPoolLoggingExecutor extends ThreadPoolExecutor {
+    
+    private static Logger LOG = LoggerFactory.getLogger(ThreadPoolLoggingExecutor.class);
+
+    /**
+     * @param corePoolSize
+     * @param maximumPoolSize
+     * @param keepAliveTime
+     * @param unit
+     * @param workQueue
+     */
+    public ThreadPoolLoggingExecutor(int corePoolSize, int maximumPoolSize,
+            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+    }
+
+    @Override
+    protected void afterExecute(Runnable r, Throwable t) {
+        super.afterExecute(r, t);
+        // in case of executing pure Runnable
+        if (t != null) {
+            LOG.warn("thread in pool stopped with error", t);
+        }
+    }
+}
index de5883e445534cee7fe440102eb0119dc4deb8ed..c2c8410051e0c0b3e51e9d889881c8bd69f08c46 100644 (file)
@@ -64,13 +64,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemovedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutputBuilder;
@@ -88,13 +86,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterRemovedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutputBuilder;
@@ -210,6 +206,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         rpcTaskContext.setMaxTimeout(maxTimeout);
         rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit);
         rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager().getRpcPool());
+        rpcTaskContext.setMessageSpy(OFSessionUtil.getSessionManager().getMessageSpy());
     }
 
     @Override
@@ -970,9 +967,8 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch {
         return Futures.immediateFuture(rpcResult);
     }
 
-    private TransactionId generateTransactionId(final Long xid) {
-        String stringXid = xid.toString();
-        BigInteger bigIntXid = new BigInteger(stringXid);
+    private static TransactionId generateTransactionId(final Long xid) {
+        BigInteger bigIntXid = BigInteger.valueOf(xid);
         return new TransactionId(bigIntXid);
 
     }
index e96da2bc26e8dc25e8d25ded505d3bb1a653c397..ed59fe922221a3274644a0737183d00eb9fe713d 100644 (file)
@@ -12,6 +12,8 @@ import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
 
@@ -26,6 +28,7 @@ public class OFRpcTaskContext {
     private long maxTimeout;
     private TimeUnit maxTimeoutUnit;
     private ListeningExecutorService rpcPool;
+    private MessageSpy<DataContainer> messageSpy;
     
     /**
      * @return the messageService
@@ -101,4 +104,17 @@ public class OFRpcTaskContext {
     public ListeningExecutorService getRpcPool() {
         return rpcPool;
     }
+    
+    /**
+     * @param messageSpy
+     */
+    public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+        this.messageSpy = messageSpy;
+    }
+    /**
+     * @return the messageSpy
+     */
+    public MessageSpy<DataContainer> getMessageSpy() {
+        return messageSpy;
+    }
 }
index 921d49421b57f3673e9697ee0282761da8e65d59..ec94bdb547ae451231acbca329bce5809ffcf9c7 100644 (file)
@@ -85,8 +85,8 @@ public abstract class OFRpcTaskFactory {
                             getMessageService().flowMod(ofFlowModInput.build(), getCookie());
                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
                     
-                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService()
-                            createFlowAddedNotification(xId, getInput()));
+                    OFRpcTaskUtil.hookFutureNotification(this, result
+                            getRpcNotificationProviderService(), createFlowAddedNotification(xId, getInput()));
                 }
 
                 return result;
@@ -144,8 +144,8 @@ public abstract class OFRpcTaskFactory {
                             getMessageService().flowMod(ofFlowModInput.build(), getCookie());
                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
                     
-                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService()
-                            createFlowUpdatedNotification(xId, getInput()));
+                    OFRpcTaskUtil.hookFutureNotification(this, result
+                            getRpcNotificationProviderService(), createFlowUpdatedNotification(xId, getInput()));
                 }
                 return result;
             }
@@ -201,8 +201,8 @@ public abstract class OFRpcTaskFactory {
                             .groupMod(ofGroupModInput.build(), getCookie());
                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
                     
-                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService()
-                            createGroupAddedNotification(xId, getInput()));
+                    OFRpcTaskUtil.hookFutureNotification(this, result
+                            getRpcNotificationProviderService(), createGroupAddedNotification(xId, getInput()));
                 }
 
                 return result;
@@ -260,8 +260,8 @@ public abstract class OFRpcTaskFactory {
                             .meterMod(ofMeterModInput.build(), getCookie());
                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
                     
-                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService()
-                            createMeterAddedNotification(xId, getInput()));
+                    OFRpcTaskUtil.hookFutureNotification(this, result
+                            getRpcNotificationProviderService(), createMeterAddedNotification(xId, getInput()));
                 }
 
                 return result;
@@ -321,8 +321,8 @@ public abstract class OFRpcTaskFactory {
                             getMessageService().groupMod(ofGroupModInput.build(), getCookie());
                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
                     
-                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService()
-                            createGroupUpdatedNotification(xId, getInput()));
+                    OFRpcTaskUtil.hookFutureNotification(this, result
+                            getRpcNotificationProviderService(), createGroupUpdatedNotification(xId, getInput()));
                 }
                 return result;
             }
@@ -378,8 +378,8 @@ public abstract class OFRpcTaskFactory {
                             getMessageService().meterMod(ofMeterModInput.build(), getCookie());
                     result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
                     
-                    OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), 
-                            createMeterUpdatedNotification(xId, getInput()));
+                    OFRpcTaskUtil.hookFutureNotification(this, result,
+                            getRpcNotificationProviderService(), createMeterUpdatedNotification(xId, getInput()));
                 }
                 return result;
             }
index 03662fd36accb49e127022bb2ce890e0fd8e79c1..f91920287831b7df2cd5a6c3e04a77190fa4df27 100644 (file)
@@ -19,8 +19,10 @@ import org.opendaylight.openflowplugin.openflow.md.core.MessageFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.Notification;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
@@ -95,11 +97,14 @@ public abstract class OFRpcTaskUtil {
     }
     
     /**
+     * @param task of rpc
      * @param originalResult
      * @param notificationProviderService
      * @param notificationComposer lazy notification composer
      */
-    public static <R, N extends Notification> void hookFutureNotification(ListenableFuture<R> originalResult, 
+    public static <R, N extends Notification, INPUT extends DataContainer> void hookFutureNotification(
+            final OFRpcTask<INPUT, R> task, 
+            ListenableFuture<R> originalResult, 
             final NotificationProviderService notificationProviderService, 
             final NotificationComposer<N> notificationComposer) {
         Futures.addCallback(originalResult, new FutureCallback<R>() {
@@ -108,14 +113,16 @@ public abstract class OFRpcTaskUtil {
                 if (null != notificationProviderService) {
                     notificationProviderService.publish(notificationComposer.compose());
                 }
+                task.getTaskContext().getMessageSpy().spyMessage(
+                        task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_SUCCESS);
             }
             
             @Override
             public void onFailure(Throwable t) {
-                //NOOP
+                task.getTaskContext().getMessageSpy().spyMessage(
+                        task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_FAILURE);
             }
         });
-        
     }
 
 }
index 6c7de9d5adff50eaaef1029c6eb7765f399ad386..d16877bdd44003c965c0aa05a85361be811e09b2 100644 (file)
@@ -19,8 +19,7 @@ import org.opendaylight.openflowplugin.openflow.md.core.MDController;
 import org.opendaylight.openflowplugin.openflow.md.core.cmd.MessageCountCommandProvider;
 import org.opendaylight.openflowplugin.openflow.md.queue.MessageObservatory;
 import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpyCounterImpl;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.RpcService;
 import org.osgi.framework.BundleContext;
 import org.slf4j.Logger;
@@ -43,7 +42,7 @@ public class OpenflowPluginProvider implements BindingAwareProvider, AutoCloseab
     
     private MessageCountCommandProvider messageCountCommandProvider;
 
-    private MessageObservatory<OfHeader, DataObject> messageCountProvider;
+    private MessageObservatory<DataContainer> messageCountProvider;
     
     private SalRegistrationManager registrationManager;
     
index 6e16b6eb7d10caee2e4953dd0edcc1008096feeb..f5553cb891cd478e72b1f8d011deb3ce22b30737 100644 (file)
@@ -19,9 +19,11 @@ import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -115,12 +117,22 @@ public interface SessionManager extends AutoCloseable {
     void setPopListenerMapping(Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenerMapping);
 
     /**
-     * @param newFixedThreadPool
+     * @param rpcPoolDelegator
      */
-    void setRpcPool(ListeningExecutorService newFixedThreadPool);
+    void setRpcPool(ListeningExecutorService rpcPoolDelegator);
 
     /**
      * @return the rpcPool instance
      */
     ListeningExecutorService getRpcPool();
+
+    /**
+     * @param messageSpy
+     */
+    void setMessageSpy(MessageSpy<DataContainer> messageSpy);
+
+    /**
+     * @return the messageSpy
+     */
+    MessageSpy<DataContainer> getMessageSpy();
 }
index afda07fc38961ec1c24156b9a0082811d47cfd36..24ba19a676d1247cfec53b3a26197dd808f42024 100644 (file)
@@ -21,10 +21,12 @@ import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -198,6 +200,7 @@ public class SessionManagerOFImpl implements SessionManager {
             }
         }
     };
+    private MessageSpy<DataContainer> messageSpy;
     
 
     @Override
@@ -261,4 +264,14 @@ public class SessionManagerOFImpl implements SessionManager {
     public ListeningExecutorService getRpcPool() {
         return rpcPool;
     }
+    
+    @Override
+    public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+        this.messageSpy = messageSpy;
+    }
+    
+    @Override
+    public MessageSpy<DataContainer> getMessageSpy() {
+        return messageSpy;
+    }
 }
index 254dedce807cddcc0edcdbe134ee04802dc7cddf..d5fe8580221e3c144612002348fb1191ff5da4e6 100644 (file)
@@ -9,11 +9,10 @@
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
 /**
- * @param <MSG_IN> 
- * @param <MSG_OUT> 
+ * @param <MSG_TYPE> 
  * 
  */
-public interface MessageObservatory<MSG_IN, MSG_OUT> extends MessageSpy<MSG_IN, MSG_OUT>, MessageCountDumper {
+public interface MessageObservatory<MSG_TYPE> extends MessageSpy<MSG_TYPE>, MessageCountDumper {
 
     // just unifying iface
     
index e09b879d29c37d1f6485dc61f14893eb5ac7affa..40a271757bb83b95143bcec11165c8876836e996 100644 (file)
@@ -8,23 +8,56 @@
 
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
-import java.util.List;
 
 /**
  * ticket spy - aimed on collecting intel about tickets 
- * @param <MSG_IN> type of incoming message
- * @param <MSG_OUT> type of outcoming message
+ * @param <MSG_TYPE> type of watched message
  */
-public interface MessageSpy<MSG_IN, MSG_OUT> extends Runnable {
+public interface MessageSpy<MSG_TYPE> extends Runnable {
 
     /**
      * @param message content of ticket
      */
-    void spyIn(MSG_IN message);
+    void spyIn(MSG_TYPE message);
 
     /**
      * @param message content of ticket
      */
-    void spyOut(List<MSG_OUT> message);
-
-}
+    void spyOut(MSG_TYPE message);
+    
+    
+    // TODO: temporary solution, should be refactored and moved to managed bean
+    
+    /**
+     * statistic groups overall in OFPlugin
+     */
+    enum STATISTIC_GROUP {
+        /** message from switch, enqueued for processing */
+        FROM_SWITCH_ENQUEUED,
+        /** message from switch translated successfully - source */
+        FROM_SWITCH_TRANSLATE_IN_SUCCESS,
+        /** message from switch translated successfully - target */
+        FROM_SWITCH_TRANSLATE_OUT_SUCCESS,
+        /** message from switch where translation failed - source */
+        FROM_SWITCH_TRANSLATE_SRC_FAILURE,
+        /** message from switch finally published into MD-SAL */
+        FROM_SWITCH_PUBLISHED_SUCCESS,
+        /** message from switch - publishing into MD-SAL failed */
+        FROM_SWITCH_PUBLISHED_FAILURE,
+        
+        /** message from MD-SAL to switch via RPC enqueued */
+        TO_SWITCH_ENQUEUED_SUCCESS,
+        /** message from MD-SAL to switch via RPC NOT enqueued */
+        TO_SWITCH_ENQUEUED_FAILED,
+        /** message from MD-SAL to switch - sent to OFJava successfully */
+        TO_SWITCH_SUBMITTED_SUCCESS,
+        /** message from MD-SAL to switch - sent to OFJava but failed*/
+        TO_SWITCH_SUBMITTED_FAILURE
+    }
+    
+    /**
+     * @param message from switch or to switch - depends on statGroup
+     * @param statGroup 
+     */
+    void spyMessage(MSG_TYPE message, STATISTIC_GROUP statGroup);
+    }
index c1f82948d8e715a8bab472ab770578a105e2bb8f..6af42b9065b52fd2c670903fa8ec23ef6237de2d 100644 (file)
@@ -9,46 +9,72 @@
 package org.opendaylight.openflowplugin.openflow.md.queue;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
-import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * message counter (by type)
  */
-public class MessageSpyCounterImpl implements MessageObservatory<OfHeader, DataObject> {
+public class MessageSpyCounterImpl implements MessageObservatory<DataContainer> {
     
     private static final Logger LOG = LoggerFactory
             .getLogger(MessageSpyCounterImpl.class);
     
-    private Map<Class<? extends DataContainer>, AtomicLong[]> inputStats = new ConcurrentHashMap<>();
+    private Map<STATISTIC_GROUP, Map<Class<? extends DataContainer>, AtomicLong[]>> inputStats = new ConcurrentHashMap<>();
     
     @Override
-    public void spyIn(OfHeader message) {
+    public void spyIn(DataContainer message) {
+        AtomicLong[] counters = getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_IN_SUCCESS);
+        counters[0].incrementAndGet();  
+    }
+
+    /**
+     * @param message
+     * @param statGroup TODO
+     * @return
+     */
+    private AtomicLong[] getCounters(DataContainer message, STATISTIC_GROUP statGroup) {
         Class<? extends DataContainer> msgType = message.getImplementedInterface();
-        AtomicLong counter;
-        synchronized(msgType) {
-            AtomicLong[] counters = inputStats.get(msgType);
+        Map<Class<? extends DataContainer>, AtomicLong[]> groupData = getOrCreateGroupData(statGroup);
+        AtomicLong[] counters = getOrCreateCountersPair(msgType, groupData);
+        return counters;
+    }
+    
+    private static AtomicLong[] getOrCreateCountersPair(Class<? extends DataContainer> msgType, Map<Class<? extends DataContainer>, AtomicLong[]> groupData) {
+        AtomicLong[] counters = groupData.get(msgType);
+        synchronized(groupData) {
             if (counters == null) {
                 counters = new AtomicLong[] {new AtomicLong(), new AtomicLong()};
-                inputStats.put(msgType, counters);
+                groupData.put(msgType, counters);
             } 
-            counter = counters[0];
         }
-        counter.incrementAndGet();
+        return counters;
+    }
+
+    private Map<Class<? extends DataContainer>, AtomicLong[]> getOrCreateGroupData(STATISTIC_GROUP statGroup) {
+        Map<Class<? extends DataContainer>, AtomicLong[]> groupData = null;
+        synchronized(inputStats) {
+            groupData = inputStats.get(statGroup);
+            if (groupData == null) {
+                groupData = new HashMap<>();
+                inputStats.put(statGroup, groupData);
+            }
+        }
+        return groupData;
     }
 
     @Override
-    public void spyOut(List<DataObject> message) {
-        // NOOP   
+    public void spyOut(DataContainer message) {
+        AtomicLong[] counters = getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
+        counters[0].incrementAndGet();  
     }
 
     @Override
@@ -64,12 +90,27 @@ public class MessageSpyCounterImpl implements MessageObservatory<OfHeader, DataO
     @Override
     public List<String> dumpMessageCounts() {
         List<String> dump = new ArrayList<>();
-        for (Entry<Class<? extends DataContainer>, AtomicLong[]> statEntry : inputStats.entrySet()) {
-            long amountPerInterval = statEntry.getValue()[0].getAndSet(0);
-            long cumulativeAmount = statEntry.getValue()[1].addAndGet(amountPerInterval);
-            dump.add(String.format("MSG[%s] -> +%d | %d", 
-                    statEntry.getKey().getSimpleName(), amountPerInterval, cumulativeAmount));
+        for (STATISTIC_GROUP statGroup : STATISTIC_GROUP.values()) {
+            Map<Class<? extends DataContainer>, AtomicLong[]> groupData = inputStats.get(statGroup);
+            if (groupData != null) {
+                for (Entry<Class<? extends DataContainer>, AtomicLong[]> statEntry : groupData.entrySet()) {
+                    long amountPerInterval = statEntry.getValue()[0].getAndSet(0);
+                    long cumulativeAmount = statEntry.getValue()[1].addAndGet(amountPerInterval);
+                    dump.add(String.format("%s: MSG[%s] -> +%d | %d",
+                            statGroup,
+                            statEntry.getKey().getSimpleName(), amountPerInterval, cumulativeAmount));
+                }
+                
+            } else {
+                dump.add(String.format("%s: no activity detected", statGroup));
+            }
         }
         return dump;
     }
+
+    @Override
+    public void spyMessage(DataContainer message, STATISTIC_GROUP statGroup) {
+        AtomicLong[] counters = getCounters(message, statGroup);
+        counters[0].incrementAndGet();
+    }
 }
index 78682d3f3907b86f3a41db70e3761479277cfd3e..99c7c016cf2670d08adf96426e6a3a8bff790cad 100644 (file)
@@ -19,7 +19,9 @@ import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +40,7 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
     private int poolSize = 10;
     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
     private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
-    private MessageSpy<OfHeader, DataObject> messageSpy;
+    private MessageSpy<DataContainer> messageSpy;
 
     private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
         @Override
@@ -99,6 +101,7 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
 
     @Override
     public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
+        messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
         if(queueType == QueueKeeper.QueueType.DEFAULT) {
             TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
             ticket.setConductor(conductor);
@@ -147,7 +150,7 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
     /**
      * @param messageSpy the messageSpy to set
      */
-    public void setMessageSpy(MessageSpy<OfHeader, DataObject> messageSpy) {
+    public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
         this.messageSpy = messageSpy;
     }
 
@@ -180,15 +183,22 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
             }
             if (messageSpy != null) {
                 messageSpy.spyIn(message);
-                messageSpy.spyOut(result);
+                for (DataObject outMsg : result) {
+                    messageSpy.spyOut(outMsg);
+                }
             }
         } else {
             LOG.warn("No translators for this message Type: {}", messageType);
+            messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
         }
         return result;
     }
 
-    private void pop(List<DataObject> processedMessages,ConnectionConductor conductor) {
+    /**
+     * @param processedMessages
+     * @param conductor
+     */
+    private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
         for (DataObject msg : processedMessages) {
             Class<? extends Object> registeredType =
                     registeredOutTypeExtractor.extractRegisteredType(msg);
index e7c8ca8622581a9bc49c8c8cbb138f7e4e025d57..deeb4d938215527cbe9d30c9ccee40946d562e0d 100644 (file)
@@ -11,7 +11,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
index c72134fffd3aea08aee6d1d77762d180daef4d23..ace82888294444b66b399db8f9a5c01bd7dbfbc7 100644 (file)
@@ -16,6 +16,8 @@ import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,7 +25,7 @@ import org.slf4j.LoggerFactory;
  * @param <IN>
  * @param <OUT>
  */
-public class TicketProcessorFactory<IN, OUT> {
+public class TicketProcessorFactory<IN extends DataObject, OUT extends DataObject> {
 
     protected static final Logger LOG = LoggerFactory
             .getLogger(TicketProcessorFactory.class);
@@ -31,7 +33,7 @@ public class TicketProcessorFactory<IN, OUT> {
     protected VersionExtractor<IN> versionExtractor;
     protected RegisteredTypeExtractor<IN> registeredTypeExtractor;
     protected Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping;
-    protected MessageSpy<IN, OUT> spy;
+    protected MessageSpy<DataContainer> spy;
 
     /**
      * @param versionExtractor the versionExtractor to set
@@ -59,7 +61,7 @@ public class TicketProcessorFactory<IN, OUT> {
     /**
      * @param spy the spy to set
      */
-    public void setSpy(MessageSpy<IN, OUT> spy) {
+    public void setSpy(MessageSpy<DataContainer> spy) {
         this.spy = spy;
     }
 
@@ -82,7 +84,9 @@ public class TicketProcessorFactory<IN, OUT> {
                     // spying on result
                     if (spy != null) {
                         spy.spyIn(ticket.getMessage());
-                        spy.spyOut(ticket.getResult().get());
+                        for (OUT outMessage : ticket.getResult().get()) {
+                            spy.spyOut(outMessage);
+                        }
                     }
                 } catch (Exception e) {
                     LOG.error("translation problem: {}", e.getMessage());
index 73d5740a26e99f42a8d3dae1027f7508081a1559..6f2e3a6b507d937b37230f3dca6bb3efacc3be51 100644 (file)
@@ -33,6 +33,7 @@ import org.opendaylight.openflowplugin.openflow.md.core.plan.ConnectionAdapterSt
 import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
@@ -55,6 +56,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessageBuilder;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -93,6 +95,8 @@ public class ConnectionConductorImplTest {
     private ErrorHandlerSimpleImpl errorHandler;
 
     private int expectedErrors = 0;
+    @Mock
+    private MessageSpy<DataContainer> messageSpy;
 
     public void incrExperimenterMessageCounter() {
         this.experimenterMessageCounter++;
@@ -133,6 +137,7 @@ public class ConnectionConductorImplTest {
         popListener = new PopListenerCountingImpl<>();
 
         queueKeeper = new QueueKeeperLightImpl();
+        queueKeeper.setMessageSpy(messageSpy);
 
         connectionConductor = new ConnectionConductorImpl(adapter);
         connectionConductor.setQueueKeeper(queueKeeper);