From: Michal Rehak Date: Fri, 16 May 2014 19:11:08 +0000 (+0200) Subject: BUG-542 - adding overall statictics X-Git-Tag: release/helium~185 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=30ad89975725c6177254f580ff842dbd3ab33dff;p=openflowplugin.git BUG-542 - adding overall statictics - temporary extended messageSpy code - added logging to rpc threadPool - afterExecution - cleaned imports - deprecated hard coded configurations for OFJava SwitchConnectionProvider - simplified signature of MessageSpy - added messageSpy and rpcPool into SessionManager Change-Id: If41e33c885dab0907fdf642ad1f483d93f74b19c Signed-off-by: Michal Rehak --- diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConfigurationFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConfigurationFactory.java index 12341d7573..4436bbcb01 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConfigurationFactory.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConfigurationFactory.java @@ -13,8 +13,9 @@ import java.net.InetAddress; import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration; /** - * @author mirehak + * @deprecated use configSubsystem */ +@Deprecated public abstract class ConnectionConfigurationFactory { /** OF default listening port */ diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java index 147bb60ed6..0dc084e258 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MDController.java @@ -18,6 +18,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -81,6 +82,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.packet.service.rev130709.Tr import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.QueueStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.TableUpdated; +import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +90,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; /** @@ -101,7 +104,7 @@ public class MDController implements IMDController, AutoCloseable { private ConcurrentMap>>> messageTranslators; private Map, Collection>> popListeners; - private MessageSpy messageSpyCounter; + private MessageSpy messageSpyCounter; final private int OF10 = OFConstants.OFP_VERSION_1_0; final private int OF13 = OFConstants.OFP_VERSION_1_3; @@ -140,8 +143,12 @@ public class MDController implements IMDController, AutoCloseable { addMessageTranslator(MultipartReplyMessage.class,OF13,new MultipartReplyTableFeaturesToTableUpdatedTranslator()); addMessageTranslator(GetFeaturesOutput.class,OF10, new FeaturesV10ToNodeConnectorUpdatedTranslator()); - //TODO: move registration to factory NotificationPopListener notificationPopListener = new NotificationPopListener(); + notificationPopListener.setNotificationProviderService( + OFSessionUtil.getSessionManager().getNotificationProviderService()); + notificationPopListener.setMessageSpy(messageSpyCounter); + + //TODO: move registration to factory addMessagePopListener(NodeErrorNotification.class, notificationPopListener); addMessagePopListener(BadActionErrorNotification.class, notificationPopListener); addMessagePopListener(BadInstructionErrorNotification.class, notificationPopListener); @@ -196,14 +203,30 @@ public class MDController implements IMDController, AutoCloseable { // Push the updated Listeners to Session Manager which will be then picked up by ConnectionConductor eventually OFSessionUtil.getSessionManager().setTranslatorMapping(messageTranslators); OFSessionUtil.getSessionManager().setPopListenerMapping(popListeners); + OFSessionUtil.getSessionManager().setMessageSpy(messageSpyCounter); // prepare worker pool for rpc // TODO: get size from configSubsystem - OFSessionUtil.getSessionManager().setRpcPool( - MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10))); + int rpcThreadLimit = 10; + ListeningExecutorService rpcPoolDelegator = createRpcPoolSpyDecorated(rpcThreadLimit, messageSpyCounter); + OFSessionUtil.getSessionManager().setRpcPool(rpcPoolDelegator); } + /** + * @param rpcThreadLimit + * @param messageSpy + * @return + */ + private static ListeningExecutorService createRpcPoolSpyDecorated(int rpcThreadLimit, MessageSpy messageSpy) { + ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + ListeningExecutorService listeningRpcPool = MoreExecutors.listeningDecorator(rpcPool); + RpcListeningExecutorService rpcPoolDecorated = new RpcListeningExecutorService(listeningRpcPool); + rpcPoolDecorated.setMessageSpy(messageSpy); + return rpcPoolDecorated; + } + /** * @param switchConnectionProviders * the switchConnectionProviders to set @@ -241,7 +264,9 @@ public class MDController implements IMDController, AutoCloseable { /** * @return wished connections configurations + * @deprecated use configSubsystem */ + @Deprecated private static Collection getConnectionConfiguration() { // TODO:: get config from state manager ConnectionConfiguration configuration = ConnectionConfigurationFactory.getDefault(); @@ -333,7 +358,7 @@ public class MDController implements IMDController, AutoCloseable { * @param messageSpyCounter the messageSpyCounter to set */ public void setMessageSpyCounter( - MessageSpy messageSpyCounter) { + MessageSpy messageSpyCounter) { this.messageSpyCounter = messageSpyCounter; } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/NotificationPopListener.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/NotificationPopListener.java index 6d07186889..bd7462d994 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/NotificationPopListener.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/NotificationPopListener.java @@ -8,19 +8,48 @@ package org.opendaylight.openflowplugin.openflow.md.core; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; -import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; +import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy; import org.opendaylight.openflowplugin.openflow.md.queue.PopListener; import org.opendaylight.yangtools.yang.binding.Notification; +/** + * general publisher to MD-SAL + * + * @param type of supported notification + */ public class NotificationPopListener implements PopListener { + + private MessageSpy messageSpy; + private NotificationProviderService notificationProviderService; + + /** + * @param messageSpy the messageSpy to set + */ + public void setMessageSpy(MessageSpy messageSpy) { + this.messageSpy = messageSpy; + } + /** + * @param notificationProviderService the notificationProviderService to set + */ + public void setNotificationProviderService( + NotificationProviderService notificationProviderService) { + this.notificationProviderService = notificationProviderService; + } @Override public void onPop(T processedMessage) { + boolean published = false; if(processedMessage instanceof Notification) { - //TODO: create via factory, inject service - NotificationProviderService notificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService(); - notificationProviderService.publish((Notification) processedMessage); + if (notificationProviderService != null) { + notificationProviderService.publish((Notification) processedMessage); + messageSpy.spyMessage(processedMessage, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS); + published = true; + } + } + + if (! published) { + messageSpy.spyMessage(processedMessage, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE); } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/RpcListeningExecutorService.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/RpcListeningExecutorService.java new file mode 100644 index 0000000000..23c561c23d --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/RpcListeningExecutorService.java @@ -0,0 +1,142 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.core; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTask; +import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy; +import org.opendaylight.yangtools.yang.binding.DataContainer; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; + +/** + * + */ +public class RpcListeningExecutorService implements ListeningExecutorService { + + private MessageSpy messageSpy; + private ListeningExecutorService executorServiceDelegate; + private DataContainer notSupportedTask = new NoDataContainerTask(); + + /** + * @param executorService + */ + public RpcListeningExecutorService(ListeningExecutorService executorService) { + this.executorServiceDelegate = executorService; + } + + /** + * @param messageSpy the messageSpy to set + */ + public void setMessageSpy(MessageSpy messageSpy) { + this.messageSpy = messageSpy; + } + + @Override + public void shutdown() { + executorServiceDelegate.shutdown(); + } + + @Override + public ListenableFuture submit(Callable task) { + ListenableFuture resultFuture = executorServiceDelegate.submit(task); + + boolean covered = false; + if (task instanceof OFRpcTask) { + if (((OFRpcTask) task).getInput() instanceof DataContainer) { + messageSpy.spyMessage((DataContainer) ((OFRpcTask) task).getInput(), + MessageSpy.STATISTIC_GROUP.TO_SWITCH_ENQUEUED_SUCCESS); + covered = true; + } + } + + if (! covered) { + messageSpy.spyMessage(notSupportedTask, MessageSpy.STATISTIC_GROUP.TO_SWITCH_ENQUEUED_FAILED); + } + + return resultFuture; + } + + @Override + public ListenableFuture submit(Runnable task) { + throw new IllegalAccessError("not supported"); + } + + @Override + public ListenableFuture submit(Runnable task, T result) { + throw new IllegalAccessError("not supported"); + } + + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + return executorServiceDelegate.invokeAll(tasks); + } + + @Override + public List> invokeAll( + Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + return executorServiceDelegate.invokeAll(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + throw new IllegalAccessError("not supported"); + } + + @Override + public List shutdownNow() { + return executorServiceDelegate.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return executorServiceDelegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executorServiceDelegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) + throws InterruptedException { + return executorServiceDelegate.awaitTermination(timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + throw new IllegalAccessError("not supported"); + } + + @Override + public T invokeAny(Collection> tasks, + long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, TimeoutException { + throw new IllegalAccessError("not supported"); + } + + protected static class NoDataContainerTask implements DataContainer { + @Override + public Class getImplementedInterface() { + return null; + } + } + +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/SwitchConnectionHandlerImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/SwitchConnectionHandlerImpl.java index 294a75a7ca..50d3bee271 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/SwitchConnectionHandlerImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/SwitchConnectionHandlerImpl.java @@ -17,8 +17,7 @@ import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHan import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil; import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy; import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; -import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.DataContainer; /** * basic interconnecting piece between plugin and library @@ -29,7 +28,7 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler { private QueueKeeperLightImpl queueKeeper; private ErrorHandler errorHandler; - private MessageSpy messageSpy; + private MessageSpy messageSpy; private int spyRate = 10; /** @@ -71,7 +70,7 @@ public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler { /** * @param messageSpy the messageSpy to set */ - public void setMessageSpy(MessageSpy messageSpy) { + public void setMessageSpy(MessageSpy messageSpy) { this.messageSpy = messageSpy; } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ThreadPoolLoggingExecutor.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ThreadPoolLoggingExecutor.java new file mode 100644 index 0000000000..448f5ece92 --- /dev/null +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ThreadPoolLoggingExecutor.java @@ -0,0 +1,44 @@ +/** + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.openflowplugin.openflow.md.core; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class ThreadPoolLoggingExecutor extends ThreadPoolExecutor { + + private static Logger LOG = LoggerFactory.getLogger(ThreadPoolLoggingExecutor.class); + + /** + * @param corePoolSize + * @param maximumPoolSize + * @param keepAliveTime + * @param unit + * @param workQueue + */ + public ThreadPoolLoggingExecutor(int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + // in case of executing pure Runnable + if (t != null) { + LOG.warn("thread in pool stopped with error", t); + } + } +} diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java index de5883e445..c2c8410051 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ModelDrivenSwitchImpl.java @@ -64,13 +64,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.Flow; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemovedBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutputBuilder; @@ -88,13 +86,11 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterRemovedBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput; import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutputBuilder; @@ -210,6 +206,7 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { rpcTaskContext.setMaxTimeout(maxTimeout); rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit); rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager().getRpcPool()); + rpcTaskContext.setMessageSpy(OFSessionUtil.getSessionManager().getMessageSpy()); } @Override @@ -970,9 +967,8 @@ public class ModelDrivenSwitchImpl extends AbstractModelDrivenSwitch { return Futures.immediateFuture(rpcResult); } - private TransactionId generateTransactionId(final Long xid) { - String stringXid = xid.toString(); - BigInteger bigIntXid = new BigInteger(stringXid); + private static TransactionId generateTransactionId(final Long xid) { + BigInteger bigIntXid = BigInteger.valueOf(xid); return new TransactionId(bigIntXid); } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskContext.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskContext.java index e96da2bc26..ed59fe9222 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskContext.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskContext.java @@ -12,6 +12,8 @@ import java.util.concurrent.TimeUnit; import org.opendaylight.controller.sal.binding.api.NotificationProviderService; import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; +import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy; +import org.opendaylight.yangtools.yang.binding.DataContainer; import com.google.common.util.concurrent.ListeningExecutorService; @@ -26,6 +28,7 @@ public class OFRpcTaskContext { private long maxTimeout; private TimeUnit maxTimeoutUnit; private ListeningExecutorService rpcPool; + private MessageSpy messageSpy; /** * @return the messageService @@ -101,4 +104,17 @@ public class OFRpcTaskContext { public ListeningExecutorService getRpcPool() { return rpcPool; } + + /** + * @param messageSpy + */ + public void setMessageSpy(MessageSpy messageSpy) { + this.messageSpy = messageSpy; + } + /** + * @return the messageSpy + */ + public MessageSpy getMessageSpy() { + return messageSpy; + } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java index 921d49421b..ec94bdb547 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskFactory.java @@ -85,8 +85,8 @@ public abstract class OFRpcTaskFactory { getMessageService().flowMod(ofFlowModInput.build(), getCookie()); result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), - createFlowAddedNotification(xId, getInput())); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createFlowAddedNotification(xId, getInput())); } return result; @@ -144,8 +144,8 @@ public abstract class OFRpcTaskFactory { getMessageService().flowMod(ofFlowModInput.build(), getCookie()); result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), - createFlowUpdatedNotification(xId, getInput())); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createFlowUpdatedNotification(xId, getInput())); } return result; } @@ -201,8 +201,8 @@ public abstract class OFRpcTaskFactory { .groupMod(ofGroupModInput.build(), getCookie()); result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), - createGroupAddedNotification(xId, getInput())); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createGroupAddedNotification(xId, getInput())); } return result; @@ -260,8 +260,8 @@ public abstract class OFRpcTaskFactory { .meterMod(ofMeterModInput.build(), getCookie()); result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), - createMeterAddedNotification(xId, getInput())); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createMeterAddedNotification(xId, getInput())); } return result; @@ -321,8 +321,8 @@ public abstract class OFRpcTaskFactory { getMessageService().groupMod(ofGroupModInput.build(), getCookie()); result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), - createGroupUpdatedNotification(xId, getInput())); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createGroupUpdatedNotification(xId, getInput())); } return result; } @@ -378,8 +378,8 @@ public abstract class OFRpcTaskFactory { getMessageService().meterMod(ofMeterModInput.build(), getCookie()); result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib); - OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(), - createMeterUpdatedNotification(xId, getInput())); + OFRpcTaskUtil.hookFutureNotification(this, result, + getRpcNotificationProviderService(), createMeterUpdatedNotification(xId, getInput())); } return result; } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java index 03662fd36a..f919202878 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OFRpcTaskUtil.java @@ -19,8 +19,10 @@ import org.opendaylight.openflowplugin.openflow.md.core.MessageFactory; import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; +import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput; +import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.Notification; import org.opendaylight.yangtools.yang.common.RpcError; import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity; @@ -95,11 +97,14 @@ public abstract class OFRpcTaskUtil { } /** + * @param task of rpc * @param originalResult * @param notificationProviderService * @param notificationComposer lazy notification composer */ - public static void hookFutureNotification(ListenableFuture originalResult, + public static void hookFutureNotification( + final OFRpcTask task, + ListenableFuture originalResult, final NotificationProviderService notificationProviderService, final NotificationComposer notificationComposer) { Futures.addCallback(originalResult, new FutureCallback() { @@ -108,14 +113,16 @@ public abstract class OFRpcTaskUtil { if (null != notificationProviderService) { notificationProviderService.publish(notificationComposer.compose()); } + task.getTaskContext().getMessageSpy().spyMessage( + task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_SUCCESS); } @Override public void onFailure(Throwable t) { - //NOOP + task.getTaskContext().getMessageSpy().spyMessage( + task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_FAILURE); } }); - } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OpenflowPluginProvider.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OpenflowPluginProvider.java index 6c7de9d5ad..d16877bdd4 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OpenflowPluginProvider.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/OpenflowPluginProvider.java @@ -19,8 +19,7 @@ import org.opendaylight.openflowplugin.openflow.md.core.MDController; import org.opendaylight.openflowplugin.openflow.md.core.cmd.MessageCountCommandProvider; import org.opendaylight.openflowplugin.openflow.md.queue.MessageObservatory; import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpyCounterImpl; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; -import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.RpcService; import org.osgi.framework.BundleContext; import org.slf4j.Logger; @@ -43,7 +42,7 @@ public class OpenflowPluginProvider implements BindingAwareProvider, AutoCloseab private MessageCountCommandProvider messageCountCommandProvider; - private MessageObservatory messageCountProvider; + private MessageObservatory messageCountProvider; private SalRegistrationManager registrationManager; diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java index 6e16b6eb7d..f5553cb891 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManager.java @@ -19,9 +19,11 @@ import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; +import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy; import org.opendaylight.openflowplugin.openflow.md.queue.PopListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import com.google.common.util.concurrent.ListeningExecutorService; @@ -115,12 +117,22 @@ public interface SessionManager extends AutoCloseable { void setPopListenerMapping(Map, Collection>> popListenerMapping); /** - * @param newFixedThreadPool + * @param rpcPoolDelegator */ - void setRpcPool(ListeningExecutorService newFixedThreadPool); + void setRpcPool(ListeningExecutorService rpcPoolDelegator); /** * @return the rpcPool instance */ ListeningExecutorService getRpcPool(); + + /** + * @param messageSpy + */ + void setMessageSpy(MessageSpy messageSpy); + + /** + * @return the messageSpy + */ + MessageSpy getMessageSpy(); } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java index afda07fc38..24ba19a676 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java @@ -21,10 +21,12 @@ import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; +import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy; import org.opendaylight.openflowplugin.openflow.md.queue.PopListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.concepts.util.ListenerRegistry; +import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -198,6 +200,7 @@ public class SessionManagerOFImpl implements SessionManager { } } }; + private MessageSpy messageSpy; @Override @@ -261,4 +264,14 @@ public class SessionManagerOFImpl implements SessionManager { public ListeningExecutorService getRpcPool() { return rpcPool; } + + @Override + public void setMessageSpy(MessageSpy messageSpy) { + this.messageSpy = messageSpy; + } + + @Override + public MessageSpy getMessageSpy() { + return messageSpy; + } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageObservatory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageObservatory.java index 254dedce80..d5fe858022 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageObservatory.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageObservatory.java @@ -9,11 +9,10 @@ package org.opendaylight.openflowplugin.openflow.md.queue; /** - * @param - * @param + * @param * */ -public interface MessageObservatory extends MessageSpy, MessageCountDumper { +public interface MessageObservatory extends MessageSpy, MessageCountDumper { // just unifying iface diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpy.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpy.java index e09b879d29..40a271757b 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpy.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpy.java @@ -8,23 +8,56 @@ package org.opendaylight.openflowplugin.openflow.md.queue; -import java.util.List; /** * ticket spy - aimed on collecting intel about tickets - * @param type of incoming message - * @param type of outcoming message + * @param type of watched message */ -public interface MessageSpy extends Runnable { +public interface MessageSpy extends Runnable { /** * @param message content of ticket */ - void spyIn(MSG_IN message); + void spyIn(MSG_TYPE message); /** * @param message content of ticket */ - void spyOut(List message); - -} + void spyOut(MSG_TYPE message); + + + // TODO: temporary solution, should be refactored and moved to managed bean + + /** + * statistic groups overall in OFPlugin + */ + enum STATISTIC_GROUP { + /** message from switch, enqueued for processing */ + FROM_SWITCH_ENQUEUED, + /** message from switch translated successfully - source */ + FROM_SWITCH_TRANSLATE_IN_SUCCESS, + /** message from switch translated successfully - target */ + FROM_SWITCH_TRANSLATE_OUT_SUCCESS, + /** message from switch where translation failed - source */ + FROM_SWITCH_TRANSLATE_SRC_FAILURE, + /** message from switch finally published into MD-SAL */ + FROM_SWITCH_PUBLISHED_SUCCESS, + /** message from switch - publishing into MD-SAL failed */ + FROM_SWITCH_PUBLISHED_FAILURE, + + /** message from MD-SAL to switch via RPC enqueued */ + TO_SWITCH_ENQUEUED_SUCCESS, + /** message from MD-SAL to switch via RPC NOT enqueued */ + TO_SWITCH_ENQUEUED_FAILED, + /** message from MD-SAL to switch - sent to OFJava successfully */ + TO_SWITCH_SUBMITTED_SUCCESS, + /** message from MD-SAL to switch - sent to OFJava but failed*/ + TO_SWITCH_SUBMITTED_FAILURE + } + + /** + * @param message from switch or to switch - depends on statGroup + * @param statGroup + */ + void spyMessage(MSG_TYPE message, STATISTIC_GROUP statGroup); + } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpyCounterImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpyCounterImpl.java index c1f82948d8..6af42b9065 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpyCounterImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/MessageSpyCounterImpl.java @@ -9,46 +9,72 @@ package org.opendaylight.openflowplugin.openflow.md.queue; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.opendaylight.yangtools.yang.binding.DataContainer; -import org.opendaylight.yangtools.yang.binding.DataObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * message counter (by type) */ -public class MessageSpyCounterImpl implements MessageObservatory { +public class MessageSpyCounterImpl implements MessageObservatory { private static final Logger LOG = LoggerFactory .getLogger(MessageSpyCounterImpl.class); - private Map, AtomicLong[]> inputStats = new ConcurrentHashMap<>(); + private Map, AtomicLong[]>> inputStats = new ConcurrentHashMap<>(); @Override - public void spyIn(OfHeader message) { + public void spyIn(DataContainer message) { + AtomicLong[] counters = getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_IN_SUCCESS); + counters[0].incrementAndGet(); + } + + /** + * @param message + * @param statGroup TODO + * @return + */ + private AtomicLong[] getCounters(DataContainer message, STATISTIC_GROUP statGroup) { Class msgType = message.getImplementedInterface(); - AtomicLong counter; - synchronized(msgType) { - AtomicLong[] counters = inputStats.get(msgType); + Map, AtomicLong[]> groupData = getOrCreateGroupData(statGroup); + AtomicLong[] counters = getOrCreateCountersPair(msgType, groupData); + return counters; + } + + private static AtomicLong[] getOrCreateCountersPair(Class msgType, Map, AtomicLong[]> groupData) { + AtomicLong[] counters = groupData.get(msgType); + synchronized(groupData) { if (counters == null) { counters = new AtomicLong[] {new AtomicLong(), new AtomicLong()}; - inputStats.put(msgType, counters); + groupData.put(msgType, counters); } - counter = counters[0]; } - counter.incrementAndGet(); + return counters; + } + + private Map, AtomicLong[]> getOrCreateGroupData(STATISTIC_GROUP statGroup) { + Map, AtomicLong[]> groupData = null; + synchronized(inputStats) { + groupData = inputStats.get(statGroup); + if (groupData == null) { + groupData = new HashMap<>(); + inputStats.put(statGroup, groupData); + } + } + return groupData; } @Override - public void spyOut(List message) { - // NOOP + public void spyOut(DataContainer message) { + AtomicLong[] counters = getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS); + counters[0].incrementAndGet(); } @Override @@ -64,12 +90,27 @@ public class MessageSpyCounterImpl implements MessageObservatory dumpMessageCounts() { List dump = new ArrayList<>(); - for (Entry, AtomicLong[]> statEntry : inputStats.entrySet()) { - long amountPerInterval = statEntry.getValue()[0].getAndSet(0); - long cumulativeAmount = statEntry.getValue()[1].addAndGet(amountPerInterval); - dump.add(String.format("MSG[%s] -> +%d | %d", - statEntry.getKey().getSimpleName(), amountPerInterval, cumulativeAmount)); + for (STATISTIC_GROUP statGroup : STATISTIC_GROUP.values()) { + Map, AtomicLong[]> groupData = inputStats.get(statGroup); + if (groupData != null) { + for (Entry, AtomicLong[]> statEntry : groupData.entrySet()) { + long amountPerInterval = statEntry.getValue()[0].getAndSet(0); + long cumulativeAmount = statEntry.getValue()[1].addAndGet(amountPerInterval); + dump.add(String.format("%s: MSG[%s] -> +%d | %d", + statGroup, + statEntry.getKey().getSimpleName(), amountPerInterval, cumulativeAmount)); + } + + } else { + dump.add(String.format("%s: no activity detected", statGroup)); + } } return dump; } + + @Override + public void spyMessage(DataContainer message, STATISTIC_GROUP statGroup) { + AtomicLong[] counters = getCounters(message, statGroup); + counters[0].incrementAndGet(); + } } diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java index 78682d3f39..99c7c016cf 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/QueueKeeperLightImpl.java @@ -19,7 +19,9 @@ import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; +import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; +import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +40,7 @@ public class QueueKeeperLightImpl implements QueueKeeper { private int poolSize = 10; private Map>>> translatorMapping; private TicketProcessorFactory ticketProcessorFactory; - private MessageSpy messageSpy; + private MessageSpy messageSpy; private VersionExtractor versionExtractor = new VersionExtractor() { @Override @@ -99,6 +101,7 @@ public class QueueKeeperLightImpl implements QueueKeeper { @Override public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) { + messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED); if(queueType == QueueKeeper.QueueType.DEFAULT) { TicketImpl ticket = new TicketImpl<>(); ticket.setConductor(conductor); @@ -147,7 +150,7 @@ public class QueueKeeperLightImpl implements QueueKeeper { /** * @param messageSpy the messageSpy to set */ - public void setMessageSpy(MessageSpy messageSpy) { + public void setMessageSpy(MessageSpy messageSpy) { this.messageSpy = messageSpy; } @@ -180,15 +183,22 @@ public class QueueKeeperLightImpl implements QueueKeeper { } if (messageSpy != null) { messageSpy.spyIn(message); - messageSpy.spyOut(result); + for (DataObject outMsg : result) { + messageSpy.spyOut(outMsg); + } } } else { LOG.warn("No translators for this message Type: {}", messageType); + messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE); } return result; } - private void pop(List processedMessages,ConnectionConductor conductor) { + /** + * @param processedMessages + * @param conductor + */ + private void pop(List processedMessages, ConnectionConductor conductor) { for (DataObject msg : processedMessages) { Class registeredType = registeredOutTypeExtractor.extractRegisteredType(msg); diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisher.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisher.java index e7c8ca8622..deeb4d9382 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisher.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketFinisher.java @@ -11,7 +11,6 @@ import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactory.java index c72134fffd..ace8288829 100644 --- a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactory.java +++ b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/queue/TicketProcessorFactory.java @@ -16,6 +16,8 @@ import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor; import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator; import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher; import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey; +import org.opendaylight.yangtools.yang.binding.DataContainer; +import org.opendaylight.yangtools.yang.binding.DataObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,7 +25,7 @@ import org.slf4j.LoggerFactory; * @param * @param */ -public class TicketProcessorFactory { +public class TicketProcessorFactory { protected static final Logger LOG = LoggerFactory .getLogger(TicketProcessorFactory.class); @@ -31,7 +33,7 @@ public class TicketProcessorFactory { protected VersionExtractor versionExtractor; protected RegisteredTypeExtractor registeredTypeExtractor; protected Map>>> translatorMapping; - protected MessageSpy spy; + protected MessageSpy spy; /** * @param versionExtractor the versionExtractor to set @@ -59,7 +61,7 @@ public class TicketProcessorFactory { /** * @param spy the spy to set */ - public void setSpy(MessageSpy spy) { + public void setSpy(MessageSpy spy) { this.spy = spy; } @@ -82,7 +84,9 @@ public class TicketProcessorFactory { // spying on result if (spy != null) { spy.spyIn(ticket.getMessage()); - spy.spyOut(ticket.getResult().get()); + for (OUT outMessage : ticket.getResult().get()) { + spy.spyOut(outMessage); + } } } catch (Exception e) { LOG.error("translation problem: {}", e.getMessage()); diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java index 73d5740a26..6f2e3a6b50 100644 --- a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java +++ b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java @@ -33,6 +33,7 @@ import org.opendaylight.openflowplugin.openflow.md.core.plan.ConnectionAdapterSt import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory; import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent; import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext; +import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy; import org.opendaylight.openflowplugin.openflow.md.queue.PopListener; import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities; @@ -55,6 +56,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessageBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessageBuilder; +import org.opendaylight.yangtools.yang.binding.DataContainer; import org.opendaylight.yangtools.yang.binding.DataObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,6 +95,8 @@ public class ConnectionConductorImplTest { private ErrorHandlerSimpleImpl errorHandler; private int expectedErrors = 0; + @Mock + private MessageSpy messageSpy; public void incrExperimenterMessageCounter() { this.experimenterMessageCounter++; @@ -133,6 +137,7 @@ public class ConnectionConductorImplTest { popListener = new PopListenerCountingImpl<>(); queueKeeper = new QueueKeeperLightImpl(); + queueKeeper.setMessageSpy(messageSpy); connectionConductor = new ConnectionConductorImpl(adapter); connectionConductor.setQueueKeeper(queueKeeper);