added message counter 37/3837/1
authorMichal Rehak <mirehak@cisco.com>
Mon, 16 Dec 2013 15:13:00 +0000 (16:13 +0100)
committerMichal Rehak <mirehak@cisco.com>
Wed, 18 Dec 2013 18:18:39 +0000 (19:18 +0100)
fix flow switch-case values (typo)
changed sysout to log in test-provider
extended to cumulated and per interval counting

Change-Id: Ia8ff8424811ba45646b6623375e133e88c11ea3b
Signed-off-by: Michal Rehak <mirehak@cisco.com>
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/SwitchConnectionHandlerImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpy.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpyCounterImpl.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisher.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactory.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java
test-provider/src/main/java/org/opendaylight/openflowplugin/test/OpenflowpluginTestCommandProvider.java

index 956d391dba323993b135699966e8c77640b5857d..f571e0094b3a05a0ec8f6b69bbdfde86386529a3 100644 (file)
@@ -9,32 +9,49 @@
 package org.opendaylight.openflowplugin.openflow.md.core;
 
 import java.net.InetAddress;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpyCounterImpl;
 import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataObject;
 
 /**
  * @author mirehak
  *
  */
 public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler {
+    
+    private ScheduledThreadPoolExecutor spyPool; 
 
     private QueueKeeperLightImpl queueKeeper;
     private ErrorHandler errorHandler;
+    private MessageSpy<OfHeader, DataObject> messageSpy;
+    private int spyRate = 10;
 
     /**
      *
      */
     public SwitchConnectionHandlerImpl() {
+        messageSpy = new MessageSpyCounterImpl();
         queueKeeper = new QueueKeeperLightImpl();
         queueKeeper.setTranslatorMapping(OFSessionUtil.getTranslatorMap());
         queueKeeper.setPopListenersMapping(OFSessionUtil.getPopListenerMapping());
+        queueKeeper.setMessageSpy(messageSpy);
+        
         queueKeeper.init();
 
         errorHandler = new ErrorHandlerQueueImpl();
         new Thread(errorHandler).start();
+        
+        //TODO: implement shutdown invocation upon service stop event
+        spyPool = new ScheduledThreadPoolExecutor(1);
+        spyPool.scheduleAtFixedRate(messageSpy, spyRate, spyRate, TimeUnit.SECONDS);
     }
 
     @Override
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpy.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpy.java
new file mode 100644 (file)
index 0000000..e09b879
--- /dev/null
@@ -0,0 +1,30 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.List;
+
+/**
+ * ticket spy - aimed on collecting intel about tickets 
+ * @param <MSG_IN> type of incoming message
+ * @param <MSG_OUT> type of outcoming message
+ */
+public interface MessageSpy<MSG_IN, MSG_OUT> extends Runnable {
+
+    /**
+     * @param message content of ticket
+     */
+    void spyIn(MSG_IN message);
+
+    /**
+     * @param message content of ticket
+     */
+    void spyOut(List<MSG_OUT> message);
+
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpyCounterImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpyCounterImpl.java
new file mode 100644 (file)
index 0000000..fe32b8c
--- /dev/null
@@ -0,0 +1,64 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * message counter (by type)
+ */
+public class MessageSpyCounterImpl implements MessageSpy<OfHeader, DataObject> {
+    
+    private static final Logger LOG = LoggerFactory
+            .getLogger(MessageSpyCounterImpl.class);
+    
+    private Map<Class<? extends DataContainer>, AtomicLong[]> inputStats = new ConcurrentHashMap<>();
+    
+    @Override
+    public void spyIn(OfHeader message) {
+        Class<? extends DataContainer> msgType = message.getImplementedInterface();
+        AtomicLong counter;
+        synchronized(msgType) {
+            AtomicLong[] counters = inputStats.get(msgType);
+            if (counters == null) {
+                counters = new AtomicLong[] {new AtomicLong(), new AtomicLong()};
+                inputStats.put(msgType, counters);
+            } 
+            counter = counters[0];
+        }
+        counter.incrementAndGet();
+    }
+
+    @Override
+    public void spyOut(List<DataObject> message) {
+        // NOOP   
+    }
+
+    @Override
+    public void run() {
+        // log current counters and cleans it
+        if (LOG.isDebugEnabled()) {
+            for (Entry<Class<? extends DataContainer>, AtomicLong[]> statEntry : inputStats.entrySet()) {
+                long amountPerInterval = statEntry.getValue()[0].getAndSet(0);
+                long cumulativeAmount = statEntry.getValue()[1].addAndGet(amountPerInterval);
+                LOG.debug("MSG[{}] -> +{} | {}", statEntry.getKey().getSimpleName(), amountPerInterval, cumulativeAmount);
+            }
+        }
+    }
+}
index 6a2c8080ef377194346ac0917da3e2973789260f..c482df8818d55d43f6460f29b0f278f8e7416349 100644 (file)
@@ -37,6 +37,8 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
     private ScheduledThreadPoolExecutor pool;
     private int poolSize = 10;
     private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+    private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
+    private MessageSpy<OfHeader, DataObject> messageSpy;
 
     private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
         @Override
@@ -71,6 +73,13 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
     public void init() {
         processQueue = new LinkedBlockingQueue<>(100);
         pool = new ScheduledThreadPoolExecutor(poolSize);
+        
+        ticketProcessorFactory = new TicketProcessorFactory<>();
+        ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
+        ticketProcessorFactory.setTranslatorMapping(translatorMapping);
+        ticketProcessorFactory.setVersionExtractor(versionExtractor);
+        ticketProcessorFactory.setSpy(messageSpy);
+        
         TicketFinisher<DataObject> finisher = new TicketFinisher<>(
                 processQueue, popListenersMapping, registeredOutTypeExtractor);
         new Thread(finisher).start();
@@ -109,8 +118,8 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
      * @param ticket
      */
     private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
-        pool.execute(TicketProcessorFactory.createProcessor(ticket, versionExtractor,
-                registeredSrcTypeExtractor, translatorMapping));
+        Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
+        pool.execute(ticketProcessor);
     }
 
     /**
@@ -132,6 +141,13 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
         this.popListenersMapping = popListenersMapping;
     }
     
+    /**
+     * @param messageSpy the messageSpy to set
+     */
+    public void setMessageSpy(MessageSpy<OfHeader, DataObject> messageSpy) {
+        this.messageSpy = messageSpy;
+    }
+    
     private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
         List<DataObject> result = new ArrayList<>();
         Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
@@ -159,6 +175,10 @@ public class QueueKeeperLightImpl implements QueueKeeper<OfHeader, DataObject> {
                     result.addAll(translator.translate(cookie, conductor.getSessionContext(), message));
                 }
             }
+            if (messageSpy != null) {
+                messageSpy.spyIn(message);
+                messageSpy.spyOut(result);
+            }
         } else {
             LOG.warn("No translators for this message Type: {}", messageType);
         }
index b0ec921d0ce7817861be0990957d453d62b96faa..f9af9f0d68916b2221141b14d862d2da409ab7e5 100644 (file)
@@ -17,8 +17,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @author mirehak
- * @param <IN> source type
  * @param <OUT> result type
  *
  */
index e3518dc0a024e645585c1a8cc14c288a5c85ed35..c4ff3e2b3fe4430b24f6e4788342430d2ba461d3 100644 (file)
@@ -20,27 +20,57 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * @author mirehak
- *
+ * @param <IN>
+ * @param <OUT>
  */
-public abstract class TicketProcessorFactory {
+public class TicketProcessorFactory<IN, OUT> {
 
     protected static final Logger LOG = LoggerFactory
             .getLogger(TicketProcessorFactory.class);
+    
+    protected VersionExtractor<IN> versionExtractor;
+    protected RegisteredTypeExtractor<IN> registeredTypeExtractor;
+    protected Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping;
+    protected MessageSpy<IN, OUT> spy;
+    
+    /**
+     * @param versionExtractor the versionExtractor to set
+     */
+    public void setVersionExtractor(VersionExtractor<IN> versionExtractor) {
+        this.versionExtractor = versionExtractor;
+    }
+
+    /**
+     * @param registeredTypeExtractor the registeredTypeExtractor to set
+     */
+    public void setRegisteredTypeExtractor(
+            RegisteredTypeExtractor<IN> registeredTypeExtractor) {
+        this.registeredTypeExtractor = registeredTypeExtractor;
+    }
+
+    /**
+     * @param translatorMapping the translatorMapping to set
+     */
+    public void setTranslatorMapping(
+            Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping) {
+        this.translatorMapping = translatorMapping;
+    }
+
+    /**
+     * @param spy the spy to set
+     */
+    public void setSpy(MessageSpy<IN, OUT> spy) {
+        this.spy = spy;
+    }
+
 
     /**
      * @param ticket
-     * @param versionExtractor
-     * @param registeredTypeExtractor
-     * @param translatorMapping
      * @return runnable ticket processor
      */
-    public static <IN, OUT> Runnable createProcessor(
-            final Ticket<IN, OUT> ticket,
-            final VersionExtractor<IN> versionExtractor,
-            final RegisteredTypeExtractor<IN> registeredTypeExtractor,
-            final Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping) {
-        return new Runnable() {
+    public Runnable createProcessor(final Ticket<IN, OUT> ticket) {
+        
+        Runnable ticketProcessor = new Runnable() {
             @Override
             public void run() {
                 LOG.debug("message received, type: {}", registeredTypeExtractor.extractRegisteredType(
@@ -49,6 +79,11 @@ public abstract class TicketProcessorFactory {
                 try {
                     translate = translate();
                     ticket.getResult().set(translate);
+                    // spying on result
+                    if (spy != null) {
+                        spy.spyIn(ticket.getMessage());
+                        spy.spyOut(ticket.getResult().get());
+                    }
                 } catch (Exception e) {
                     LOG.error("translation problem: {}", e.getMessage());
                     ticket.getResult().setException(e);
@@ -72,7 +107,7 @@ public abstract class TicketProcessorFactory {
 
                 Short version = versionExtractor.extractVersion(message);
                 if (version == null) {
-                   throw new IllegalArgumentException("version is NULL");
+                    throw new IllegalArgumentException("version is NULL");
                 }
                 TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
                 translators = translatorMapping.get(tKey);
@@ -97,5 +132,7 @@ public abstract class TicketProcessorFactory {
                 return result;
             }
         };
+
+        return ticketProcessor;
     }
 }
index 525c29acc0f258c8b89e67da022aef08a588302d..bdf270c67e5b40fcba2d1c6dd818f0d06be866cc 100644 (file)
@@ -131,7 +131,6 @@ public class ConnectionConductorImplTest {
         popListener = new PopListenerCountingImpl<>();
 
         queueKeeper = new QueueKeeperLightImpl();
-        queueKeeper.init();
 
         connectionConductor = new ConnectionConductorImpl(adapter);
         connectionConductor.setQueueKeeper(queueKeeper);
@@ -149,6 +148,7 @@ public class ConnectionConductorImplTest {
 
         controller.getMessageTranslators().putAll(assembleTranslatorMapping());
         queueKeeper.setPopListenersMapping(assemblePopListenerMapping());
+        queueKeeper.init();
     }
 
     /**
index 9612e94b57d2f111bdab8329a56645e3a2064426..c7731a6c36f52119601391296324b903cbecc6c0 100644 (file)
@@ -487,7 +487,7 @@ public class OpenflowpluginTestCommandProvider implements CommandProvider {
             flow.setInstructions(createDropInstructions().build());
             break;
         case "f54":
-            id += 51;
+            id += 54;
             flow.setMatch(new MatchBuilder().build());
             flow.setInstructions(createSentToControllerInstructions().build());
             break;
@@ -2023,11 +2023,9 @@ public class OpenflowpluginTestCommandProvider implements CommandProvider {
             ci.println("Status of Flow Data Loaded Transaction: " + status);
 
         } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+            LOG.error(e.getMessage(), e);
         } catch (ExecutionException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+            LOG.error(e.getMessage(), e);
         }
     }
 
@@ -2054,11 +2052,9 @@ public class OpenflowpluginTestCommandProvider implements CommandProvider {
             ci.println("Status of Flow Data Loaded Transaction: " + status);
 
         } catch (InterruptedException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+            LOG.error(e.getMessage(), e);
         } catch (ExecutionException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+            LOG.error(e.getMessage(), e);
         }
     }