import org.opendaylight.openflowjava.protocol.api.connection.ConnectionConfiguration;
/**
- * @author mirehak
+ * @deprecated use configSubsystem
*/
+@Deprecated
public abstract class ConnectionConfigurationFactory {
/** OF default listening port */
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.yang.gen.v1.urn.opendaylight.port.statistics.rev131214.NodeConnectorStatisticsUpdate;
import org.opendaylight.yang.gen.v1.urn.opendaylight.queue.statistics.rev131216.QueueStatisticsUpdate;
import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.TableUpdated;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
/**
private ConcurrentMap<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> messageTranslators;
private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListeners;
- private MessageSpy<OfHeader, DataObject> messageSpyCounter;
+ private MessageSpy<DataContainer> messageSpyCounter;
final private int OF10 = OFConstants.OFP_VERSION_1_0;
final private int OF13 = OFConstants.OFP_VERSION_1_3;
addMessageTranslator(MultipartReplyMessage.class,OF13,new MultipartReplyTableFeaturesToTableUpdatedTranslator());
addMessageTranslator(GetFeaturesOutput.class,OF10, new FeaturesV10ToNodeConnectorUpdatedTranslator());
- //TODO: move registration to factory
NotificationPopListener<DataObject> notificationPopListener = new NotificationPopListener<DataObject>();
+ notificationPopListener.setNotificationProviderService(
+ OFSessionUtil.getSessionManager().getNotificationProviderService());
+ notificationPopListener.setMessageSpy(messageSpyCounter);
+
+ //TODO: move registration to factory
addMessagePopListener(NodeErrorNotification.class, notificationPopListener);
addMessagePopListener(BadActionErrorNotification.class, notificationPopListener);
addMessagePopListener(BadInstructionErrorNotification.class, notificationPopListener);
// Push the updated Listeners to Session Manager which will be then picked up by ConnectionConductor eventually
OFSessionUtil.getSessionManager().setTranslatorMapping(messageTranslators);
OFSessionUtil.getSessionManager().setPopListenerMapping(popListeners);
+ OFSessionUtil.getSessionManager().setMessageSpy(messageSpyCounter);
// prepare worker pool for rpc
// TODO: get size from configSubsystem
- OFSessionUtil.getSessionManager().setRpcPool(
- MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)));
+ int rpcThreadLimit = 10;
+ ListeningExecutorService rpcPoolDelegator = createRpcPoolSpyDecorated(rpcThreadLimit, messageSpyCounter);
+ OFSessionUtil.getSessionManager().setRpcPool(rpcPoolDelegator);
}
+ /**
+ * @param rpcThreadLimit
+ * @param messageSpy
+ * @return
+ */
+ private static ListeningExecutorService createRpcPoolSpyDecorated(int rpcThreadLimit, MessageSpy<DataContainer> messageSpy) {
+ ThreadPoolLoggingExecutor rpcPool = new ThreadPoolLoggingExecutor(rpcThreadLimit, rpcThreadLimit, 0L,
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+ ListeningExecutorService listeningRpcPool = MoreExecutors.listeningDecorator(rpcPool);
+ RpcListeningExecutorService rpcPoolDecorated = new RpcListeningExecutorService(listeningRpcPool);
+ rpcPoolDecorated.setMessageSpy(messageSpy);
+ return rpcPoolDecorated;
+ }
+
/**
* @param switchConnectionProviders
* the switchConnectionProviders to set
/**
* @return wished connections configurations
+ * @deprecated use configSubsystem
*/
+ @Deprecated
private static Collection<ConnectionConfiguration> getConnectionConfiguration() {
// TODO:: get config from state manager
ConnectionConfiguration configuration = ConnectionConfigurationFactory.getDefault();
* @param messageSpyCounter the messageSpyCounter to set
*/
public void setMessageSpyCounter(
- MessageSpy<OfHeader, DataObject> messageSpyCounter) {
+ MessageSpy<DataContainer> messageSpyCounter) {
this.messageSpyCounter = messageSpyCounter;
}
package org.opendaylight.openflowplugin.openflow.md.core;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
import org.opendaylight.yangtools.yang.binding.Notification;
+/**
+ * general publisher to MD-SAL
+ *
+ * @param <T> type of supported notification
+ */
public class NotificationPopListener<T> implements PopListener<T> {
+
+ private MessageSpy<? super T> messageSpy;
+ private NotificationProviderService notificationProviderService;
+
+ /**
+ * @param messageSpy the messageSpy to set
+ */
+ public void setMessageSpy(MessageSpy<? super T> messageSpy) {
+ this.messageSpy = messageSpy;
+ }
+ /**
+ * @param notificationProviderService the notificationProviderService to set
+ */
+ public void setNotificationProviderService(
+ NotificationProviderService notificationProviderService) {
+ this.notificationProviderService = notificationProviderService;
+ }
@Override
public void onPop(T processedMessage) {
+ boolean published = false;
if(processedMessage instanceof Notification) {
- //TODO: create via factory, inject service
- NotificationProviderService notificationProviderService = OFSessionUtil.getSessionManager().getNotificationProviderService();
- notificationProviderService.publish((Notification) processedMessage);
+ if (notificationProviderService != null) {
+ notificationProviderService.publish((Notification) processedMessage);
+ messageSpy.spyMessage(processedMessage, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_SUCCESS);
+ published = true;
+ }
+ }
+
+ if (! published) {
+ messageSpy.spyMessage(processedMessage, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_PUBLISHED_FAILURE);
}
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.opendaylight.openflowplugin.openflow.md.core.sal.OFRpcTask;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+/**
+ *
+ */
+public class RpcListeningExecutorService implements ListeningExecutorService {
+
+ private MessageSpy<DataContainer> messageSpy;
+ private ListeningExecutorService executorServiceDelegate;
+ private DataContainer notSupportedTask = new NoDataContainerTask();
+
+ /**
+ * @param executorService
+ */
+ public RpcListeningExecutorService(ListeningExecutorService executorService) {
+ this.executorServiceDelegate = executorService;
+ }
+
+ /**
+ * @param messageSpy the messageSpy to set
+ */
+ public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+ this.messageSpy = messageSpy;
+ }
+
+ @Override
+ public void shutdown() {
+ executorServiceDelegate.shutdown();
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Callable<T> task) {
+ ListenableFuture<T> resultFuture = executorServiceDelegate.submit(task);
+
+ boolean covered = false;
+ if (task instanceof OFRpcTask<?, ?>) {
+ if (((OFRpcTask<?, ?>) task).getInput() instanceof DataContainer) {
+ messageSpy.spyMessage((DataContainer) ((OFRpcTask<?, ?>) task).getInput(),
+ MessageSpy.STATISTIC_GROUP.TO_SWITCH_ENQUEUED_SUCCESS);
+ covered = true;
+ }
+ }
+
+ if (! covered) {
+ messageSpy.spyMessage(notSupportedTask, MessageSpy.STATISTIC_GROUP.TO_SWITCH_ENQUEUED_FAILED);
+ }
+
+ return resultFuture;
+ }
+
+ @Override
+ public ListenableFuture<?> submit(Runnable task) {
+ throw new IllegalAccessError("not supported");
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Runnable task, T result) {
+ throw new IllegalAccessError("not supported");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ return executorServiceDelegate.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return executorServiceDelegate.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ throw new IllegalAccessError("not supported");
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return executorServiceDelegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return executorServiceDelegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return executorServiceDelegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return executorServiceDelegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ throw new IllegalAccessError("not supported");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
+ long timeout, TimeUnit unit) throws InterruptedException,
+ ExecutionException, TimeoutException {
+ throw new IllegalAccessError("not supported");
+ }
+
+ protected static class NoDataContainerTask implements DataContainer {
+ @Override
+ public Class<? extends DataContainer> getImplementedInterface() {
+ return null;
+ }
+ }
+
+}
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
/**
* basic interconnecting piece between plugin and library
private QueueKeeperLightImpl queueKeeper;
private ErrorHandler errorHandler;
- private MessageSpy<OfHeader, DataObject> messageSpy;
+ private MessageSpy<DataContainer> messageSpy;
private int spyRate = 10;
/**
/**
* @param messageSpy the messageSpy to set
*/
- public void setMessageSpy(MessageSpy<OfHeader, DataObject> messageSpy) {
+ public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
this.messageSpy = messageSpy;
}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class ThreadPoolLoggingExecutor extends ThreadPoolExecutor {
+
+ private static Logger LOG = LoggerFactory.getLogger(ThreadPoolLoggingExecutor.class);
+
+ /**
+ * @param corePoolSize
+ * @param maximumPoolSize
+ * @param keepAliveTime
+ * @param unit
+ * @param workQueue
+ */
+ public ThreadPoolLoggingExecutor(int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ // in case of executing pure Runnable
+ if (t != null) {
+ LOG.warn("thread in pool stopped with error", t);
+ }
+ }
+}
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupRemovedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.GroupUpdatedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.RemoveGroupOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.UpdateGroupOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.statistics.rev131111.GetAllGroupStatisticsOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.AddMeterOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterRemovedBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.MeterUpdatedBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.RemoveMeterOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.UpdateMeterOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.statistics.rev131111.GetAllMeterConfigStatisticsOutputBuilder;
rpcTaskContext.setMaxTimeout(maxTimeout);
rpcTaskContext.setMaxTimeoutUnit(maxTimeoutUnit);
rpcTaskContext.setRpcPool(OFSessionUtil.getSessionManager().getRpcPool());
+ rpcTaskContext.setMessageSpy(OFSessionUtil.getSessionManager().getMessageSpy());
}
@Override
return Futures.immediateFuture(rpcResult);
}
- private TransactionId generateTransactionId(final Long xid) {
- String stringXid = xid.toString();
- BigInteger bigIntXid = new BigInteger(stringXid);
+ private static TransactionId generateTransactionId(final Long xid) {
+ BigInteger bigIntXid = BigInteger.valueOf(xid);
return new TransactionId(bigIntXid);
}
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import com.google.common.util.concurrent.ListeningExecutorService;
private long maxTimeout;
private TimeUnit maxTimeoutUnit;
private ListeningExecutorService rpcPool;
+ private MessageSpy<DataContainer> messageSpy;
/**
* @return the messageService
public ListeningExecutorService getRpcPool() {
return rpcPool;
}
+
+ /**
+ * @param messageSpy
+ */
+ public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+ this.messageSpy = messageSpy;
+ }
+ /**
+ * @return the messageSpy
+ */
+ public MessageSpy<DataContainer> getMessageSpy() {
+ return messageSpy;
+ }
}
getMessageService().flowMod(ofFlowModInput.build(), getCookie());
result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
- OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(),
- createFlowAddedNotification(xId, getInput()));
+ OFRpcTaskUtil.hookFutureNotification(this, result,
+ getRpcNotificationProviderService(), createFlowAddedNotification(xId, getInput()));
}
return result;
getMessageService().flowMod(ofFlowModInput.build(), getCookie());
result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
- OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(),
- createFlowUpdatedNotification(xId, getInput()));
+ OFRpcTaskUtil.hookFutureNotification(this, result,
+ getRpcNotificationProviderService(), createFlowUpdatedNotification(xId, getInput()));
}
return result;
}
.groupMod(ofGroupModInput.build(), getCookie());
result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
- OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(),
- createGroupAddedNotification(xId, getInput()));
+ OFRpcTaskUtil.hookFutureNotification(this, result,
+ getRpcNotificationProviderService(), createGroupAddedNotification(xId, getInput()));
}
return result;
.meterMod(ofMeterModInput.build(), getCookie());
result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
- OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(),
- createMeterAddedNotification(xId, getInput()));
+ OFRpcTaskUtil.hookFutureNotification(this, result,
+ getRpcNotificationProviderService(), createMeterAddedNotification(xId, getInput()));
}
return result;
getMessageService().groupMod(ofGroupModInput.build(), getCookie());
result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
- OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(),
- createGroupUpdatedNotification(xId, getInput()));
+ OFRpcTaskUtil.hookFutureNotification(this, result,
+ getRpcNotificationProviderService(), createGroupUpdatedNotification(xId, getInput()));
}
return result;
}
getMessageService().meterMod(ofMeterModInput.build(), getCookie());
result = JdkFutureAdapters.listenInPoolThread(resultFromOFLib);
- OFRpcTaskUtil.hookFutureNotification(result, getRpcNotificationProviderService(),
- createMeterUpdatedNotification(xId, getInput()));
+ OFRpcTaskUtil.hookFutureNotification(this, result,
+ getRpcNotificationProviderService(), createMeterUpdatedNotification(xId, getInput()));
}
return result;
}
import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.session.IMessageDispatchService;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.Notification;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorSeverity;
}
/**
+ * @param task of rpc
* @param originalResult
* @param notificationProviderService
* @param notificationComposer lazy notification composer
*/
- public static <R, N extends Notification> void hookFutureNotification(ListenableFuture<R> originalResult,
+ public static <R, N extends Notification, INPUT extends DataContainer> void hookFutureNotification(
+ final OFRpcTask<INPUT, R> task,
+ ListenableFuture<R> originalResult,
final NotificationProviderService notificationProviderService,
final NotificationComposer<N> notificationComposer) {
Futures.addCallback(originalResult, new FutureCallback<R>() {
if (null != notificationProviderService) {
notificationProviderService.publish(notificationComposer.compose());
}
+ task.getTaskContext().getMessageSpy().spyMessage(
+ task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_SUCCESS);
}
@Override
public void onFailure(Throwable t) {
- //NOOP
+ task.getTaskContext().getMessageSpy().spyMessage(
+ task.getInput(), MessageSpy.STATISTIC_GROUP.TO_SWITCH_SUBMITTED_FAILURE);
}
});
-
}
}
import org.opendaylight.openflowplugin.openflow.md.core.cmd.MessageCountCommandProvider;
import org.opendaylight.openflowplugin.openflow.md.queue.MessageObservatory;
import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpyCounterImpl;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
-import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.RpcService;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
private MessageCountCommandProvider messageCountCommandProvider;
- private MessageObservatory<OfHeader, DataObject> messageCountProvider;
+ private MessageObservatory<DataContainer> messageCountProvider;
private SalRegistrationManager registrationManager;
import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import com.google.common.util.concurrent.ListeningExecutorService;
void setPopListenerMapping(Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> popListenerMapping);
/**
- * @param newFixedThreadPool
+ * @param rpcPoolDelegator
*/
- void setRpcPool(ListeningExecutorService newFixedThreadPool);
+ void setRpcPool(ListeningExecutorService rpcPoolDelegator);
/**
* @return the rpcPool instance
*/
ListeningExecutorService getRpcPool();
+
+ /**
+ * @param messageSpy
+ */
+ void setMessageSpy(MessageSpy<DataContainer> messageSpy);
+
+ /**
+ * @return the messageSpy
+ */
+ MessageSpy<DataContainer> getMessageSpy();
}
import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
}
};
+ private MessageSpy<DataContainer> messageSpy;
@Override
public ListeningExecutorService getRpcPool() {
return rpcPool;
}
+
+ @Override
+ public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
+ this.messageSpy = messageSpy;
+ }
+
+ @Override
+ public MessageSpy<DataContainer> getMessageSpy() {
+ return messageSpy;
+ }
}
package org.opendaylight.openflowplugin.openflow.md.queue;
/**
- * @param <MSG_IN>
- * @param <MSG_OUT>
+ * @param <MSG_TYPE>
*
*/
-public interface MessageObservatory<MSG_IN, MSG_OUT> extends MessageSpy<MSG_IN, MSG_OUT>, MessageCountDumper {
+public interface MessageObservatory<MSG_TYPE> extends MessageSpy<MSG_TYPE>, MessageCountDumper {
// just unifying iface
package org.opendaylight.openflowplugin.openflow.md.queue;
-import java.util.List;
/**
* ticket spy - aimed on collecting intel about tickets
- * @param <MSG_IN> type of incoming message
- * @param <MSG_OUT> type of outcoming message
+ * @param <MSG_TYPE> type of watched message
*/
-public interface MessageSpy<MSG_IN, MSG_OUT> extends Runnable {
+public interface MessageSpy<MSG_TYPE> extends Runnable {
/**
* @param message content of ticket
*/
- void spyIn(MSG_IN message);
+ void spyIn(MSG_TYPE message);
/**
* @param message content of ticket
*/
- void spyOut(List<MSG_OUT> message);
-
-}
+ void spyOut(MSG_TYPE message);
+
+
+ // TODO: temporary solution, should be refactored and moved to managed bean
+
+ /**
+ * statistic groups overall in OFPlugin
+ */
+ enum STATISTIC_GROUP {
+ /** message from switch, enqueued for processing */
+ FROM_SWITCH_ENQUEUED,
+ /** message from switch translated successfully - source */
+ FROM_SWITCH_TRANSLATE_IN_SUCCESS,
+ /** message from switch translated successfully - target */
+ FROM_SWITCH_TRANSLATE_OUT_SUCCESS,
+ /** message from switch where translation failed - source */
+ FROM_SWITCH_TRANSLATE_SRC_FAILURE,
+ /** message from switch finally published into MD-SAL */
+ FROM_SWITCH_PUBLISHED_SUCCESS,
+ /** message from switch - publishing into MD-SAL failed */
+ FROM_SWITCH_PUBLISHED_FAILURE,
+
+ /** message from MD-SAL to switch via RPC enqueued */
+ TO_SWITCH_ENQUEUED_SUCCESS,
+ /** message from MD-SAL to switch via RPC NOT enqueued */
+ TO_SWITCH_ENQUEUED_FAILED,
+ /** message from MD-SAL to switch - sent to OFJava successfully */
+ TO_SWITCH_SUBMITTED_SUCCESS,
+ /** message from MD-SAL to switch - sent to OFJava but failed*/
+ TO_SWITCH_SUBMITTED_FAILURE
+ }
+
+ /**
+ * @param message from switch or to switch - depends on statGroup
+ * @param statGroup
+ */
+ void spyMessage(MSG_TYPE message, STATISTIC_GROUP statGroup);
+ }
package org.opendaylight.openflowplugin.openflow.md.queue;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.opendaylight.yangtools.yang.binding.DataContainer;
-import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* message counter (by type)
*/
-public class MessageSpyCounterImpl implements MessageObservatory<OfHeader, DataObject> {
+public class MessageSpyCounterImpl implements MessageObservatory<DataContainer> {
private static final Logger LOG = LoggerFactory
.getLogger(MessageSpyCounterImpl.class);
- private Map<Class<? extends DataContainer>, AtomicLong[]> inputStats = new ConcurrentHashMap<>();
+ private Map<STATISTIC_GROUP, Map<Class<? extends DataContainer>, AtomicLong[]>> inputStats = new ConcurrentHashMap<>();
@Override
- public void spyIn(OfHeader message) {
+ public void spyIn(DataContainer message) {
+ AtomicLong[] counters = getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_IN_SUCCESS);
+ counters[0].incrementAndGet();
+ }
+
+ /**
+ * @param message
+ * @param statGroup TODO
+ * @return
+ */
+ private AtomicLong[] getCounters(DataContainer message, STATISTIC_GROUP statGroup) {
Class<? extends DataContainer> msgType = message.getImplementedInterface();
- AtomicLong counter;
- synchronized(msgType) {
- AtomicLong[] counters = inputStats.get(msgType);
+ Map<Class<? extends DataContainer>, AtomicLong[]> groupData = getOrCreateGroupData(statGroup);
+ AtomicLong[] counters = getOrCreateCountersPair(msgType, groupData);
+ return counters;
+ }
+
+ private static AtomicLong[] getOrCreateCountersPair(Class<? extends DataContainer> msgType, Map<Class<? extends DataContainer>, AtomicLong[]> groupData) {
+ AtomicLong[] counters = groupData.get(msgType);
+ synchronized(groupData) {
if (counters == null) {
counters = new AtomicLong[] {new AtomicLong(), new AtomicLong()};
- inputStats.put(msgType, counters);
+ groupData.put(msgType, counters);
}
- counter = counters[0];
}
- counter.incrementAndGet();
+ return counters;
+ }
+
+ private Map<Class<? extends DataContainer>, AtomicLong[]> getOrCreateGroupData(STATISTIC_GROUP statGroup) {
+ Map<Class<? extends DataContainer>, AtomicLong[]> groupData = null;
+ synchronized(inputStats) {
+ groupData = inputStats.get(statGroup);
+ if (groupData == null) {
+ groupData = new HashMap<>();
+ inputStats.put(statGroup, groupData);
+ }
+ }
+ return groupData;
}
@Override
- public void spyOut(List<DataObject> message) {
- // NOOP
+ public void spyOut(DataContainer message) {
+ AtomicLong[] counters = getCounters(message, STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
+ counters[0].incrementAndGet();
}
@Override
@Override
public List<String> dumpMessageCounts() {
List<String> dump = new ArrayList<>();
- for (Entry<Class<? extends DataContainer>, AtomicLong[]> statEntry : inputStats.entrySet()) {
- long amountPerInterval = statEntry.getValue()[0].getAndSet(0);
- long cumulativeAmount = statEntry.getValue()[1].addAndGet(amountPerInterval);
- dump.add(String.format("MSG[%s] -> +%d | %d",
- statEntry.getKey().getSimpleName(), amountPerInterval, cumulativeAmount));
+ for (STATISTIC_GROUP statGroup : STATISTIC_GROUP.values()) {
+ Map<Class<? extends DataContainer>, AtomicLong[]> groupData = inputStats.get(statGroup);
+ if (groupData != null) {
+ for (Entry<Class<? extends DataContainer>, AtomicLong[]> statEntry : groupData.entrySet()) {
+ long amountPerInterval = statEntry.getValue()[0].getAndSet(0);
+ long cumulativeAmount = statEntry.getValue()[1].addAndGet(amountPerInterval);
+ dump.add(String.format("%s: MSG[%s] -> +%d | %d",
+ statGroup,
+ statEntry.getKey().getSimpleName(), amountPerInterval, cumulativeAmount));
+ }
+
+ } else {
+ dump.add(String.format("%s: no activity detected", statGroup));
+ }
}
return dump;
}
+
+ @Override
+ public void spyMessage(DataContainer message, STATISTIC_GROUP statGroup) {
+ AtomicLong[] counters = getCounters(message, statGroup);
+ counters[0].incrementAndGet();
+ }
}
import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy.STATISTIC_GROUP;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private int poolSize = 10;
private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
- private MessageSpy<OfHeader, DataObject> messageSpy;
+ private MessageSpy<DataContainer> messageSpy;
private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
@Override
@Override
public void push(OfHeader message, ConnectionConductor conductor, QueueType queueType) {
+ messageSpy.spyMessage(message, STATISTIC_GROUP.FROM_SWITCH_ENQUEUED);
if(queueType == QueueKeeper.QueueType.DEFAULT) {
TicketImpl<OfHeader, DataObject> ticket = new TicketImpl<>();
ticket.setConductor(conductor);
/**
* @param messageSpy the messageSpy to set
*/
- public void setMessageSpy(MessageSpy<OfHeader, DataObject> messageSpy) {
+ public void setMessageSpy(MessageSpy<DataContainer> messageSpy) {
this.messageSpy = messageSpy;
}
}
if (messageSpy != null) {
messageSpy.spyIn(message);
- messageSpy.spyOut(result);
+ for (DataObject outMsg : result) {
+ messageSpy.spyOut(outMsg);
+ }
}
} else {
LOG.warn("No translators for this message Type: {}", messageType);
+ messageSpy.spyMessage(message, MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_SRC_FAILURE);
}
return result;
}
- private void pop(List<DataObject> processedMessages,ConnectionConductor conductor) {
+ /**
+ * @param processedMessages
+ * @param conductor
+ */
+ private void pop(List<DataObject> processedMessages, ConnectionConductor conductor) {
for (DataObject msg : processedMessages) {
Class<? extends Object> registeredType =
registeredOutTypeExtractor.extractRegisteredType(msg);
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.opendaylight.openflowplugin.openflow.md.core.IMDMessageTranslator;
import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
import org.opendaylight.openflowplugin.openflow.md.core.TranslatorKey;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* @param <IN>
* @param <OUT>
*/
-public class TicketProcessorFactory<IN, OUT> {
+public class TicketProcessorFactory<IN extends DataObject, OUT extends DataObject> {
protected static final Logger LOG = LoggerFactory
.getLogger(TicketProcessorFactory.class);
protected VersionExtractor<IN> versionExtractor;
protected RegisteredTypeExtractor<IN> registeredTypeExtractor;
protected Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping;
- protected MessageSpy<IN, OUT> spy;
+ protected MessageSpy<DataContainer> spy;
/**
* @param versionExtractor the versionExtractor to set
/**
* @param spy the spy to set
*/
- public void setSpy(MessageSpy<IN, OUT> spy) {
+ public void setSpy(MessageSpy<DataContainer> spy) {
this.spy = spy;
}
// spying on result
if (spy != null) {
spy.spyIn(ticket.getMessage());
- spy.spyOut(ticket.getResult().get());
+ for (OUT outMessage : ticket.getResult().get()) {
+ spy.spyOut(outMessage);
+ }
}
} catch (Exception e) {
LOG.error("translation problem: {}", e.getMessage());
import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory;
import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessageBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessageBuilder;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
import org.opendaylight.yangtools.yang.binding.DataObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private ErrorHandlerSimpleImpl errorHandler;
private int expectedErrors = 0;
+ @Mock
+ private MessageSpy<DataContainer> messageSpy;
public void incrExperimenterMessageCounter() {
this.experimenterMessageCounter++;
popListener = new PopListenerCountingImpl<>();
queueKeeper = new QueueKeeperLightImpl();
+ queueKeeper.setMessageSpy(messageSpy);
connectionConductor = new ConnectionConductorImpl(adapter);
connectionConductor.setQueueKeeper(queueKeeper);