package org.opendaylight.openflowplugin.openflow.md.core;
import java.net.InetAddress;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowjava.protocol.api.connection.SwitchConnectionHandler;
import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpy;
+import org.opendaylight.openflowplugin.openflow.md.queue.MessageSpyCounterImpl;
import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataObject;
/**
* @author mirehak
*
*/
public class SwitchConnectionHandlerImpl implements SwitchConnectionHandler {
+
+ private ScheduledThreadPoolExecutor spyPool;
private QueueKeeperLightImpl queueKeeper;
private ErrorHandler errorHandler;
+ private MessageSpy<OfHeader, DataObject> messageSpy;
+ private int spyRate = 10;
/**
*
*/
public SwitchConnectionHandlerImpl() {
+ messageSpy = new MessageSpyCounterImpl();
queueKeeper = new QueueKeeperLightImpl();
queueKeeper.setTranslatorMapping(OFSessionUtil.getTranslatorMap());
queueKeeper.setPopListenersMapping(OFSessionUtil.getPopListenerMapping());
+ queueKeeper.setMessageSpy(messageSpy);
+
queueKeeper.init();
errorHandler = new ErrorHandlerQueueImpl();
new Thread(errorHandler).start();
+
+ //TODO: implement shutdown invocation upon service stop event
+ spyPool = new ScheduledThreadPoolExecutor(1);
+ spyPool.scheduleAtFixedRate(messageSpy, spyRate, spyRate, TimeUnit.SECONDS);
}
@Override
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.List;
+
+/**
+ * ticket spy - aimed on collecting intel about tickets
+ * @param <MSG_IN> type of incoming message
+ * @param <MSG_OUT> type of outcoming message
+ */
+public interface MessageSpy<MSG_IN, MSG_OUT> extends Runnable {
+
+ /**
+ * @param message content of ticket
+ */
+ void spyIn(MSG_IN message);
+
+ /**
+ * @param message content of ticket
+ */
+ void spyOut(List<MSG_OUT> message);
+
+}
--- /dev/null
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.openflow.md.queue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * message counter (by type)
+ */
+public class MessageSpyCounterImpl implements MessageSpy<OfHeader, DataObject> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(MessageSpyCounterImpl.class);
+
+ private Map<Class<? extends DataContainer>, AtomicLong[]> inputStats = new ConcurrentHashMap<>();
+
+ @Override
+ public void spyIn(OfHeader message) {
+ Class<? extends DataContainer> msgType = message.getImplementedInterface();
+ AtomicLong counter;
+ synchronized(msgType) {
+ AtomicLong[] counters = inputStats.get(msgType);
+ if (counters == null) {
+ counters = new AtomicLong[] {new AtomicLong(), new AtomicLong()};
+ inputStats.put(msgType, counters);
+ }
+ counter = counters[0];
+ }
+ counter.incrementAndGet();
+ }
+
+ @Override
+ public void spyOut(List<DataObject> message) {
+ // NOOP
+ }
+
+ @Override
+ public void run() {
+ // log current counters and cleans it
+ if (LOG.isDebugEnabled()) {
+ for (Entry<Class<? extends DataContainer>, AtomicLong[]> statEntry : inputStats.entrySet()) {
+ long amountPerInterval = statEntry.getValue()[0].getAndSet(0);
+ long cumulativeAmount = statEntry.getValue()[1].addAndGet(amountPerInterval);
+ LOG.debug("MSG[{}] -> +{} | {}", statEntry.getKey().getSimpleName(), amountPerInterval, cumulativeAmount);
+ }
+ }
+ }
+}
private ScheduledThreadPoolExecutor pool;
private int poolSize = 10;
private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping;
+ private TicketProcessorFactory<OfHeader, DataObject> ticketProcessorFactory;
+ private MessageSpy<OfHeader, DataObject> messageSpy;
private VersionExtractor<OfHeader> versionExtractor = new VersionExtractor<OfHeader>() {
@Override
public void init() {
processQueue = new LinkedBlockingQueue<>(100);
pool = new ScheduledThreadPoolExecutor(poolSize);
+
+ ticketProcessorFactory = new TicketProcessorFactory<>();
+ ticketProcessorFactory.setRegisteredTypeExtractor(registeredSrcTypeExtractor);
+ ticketProcessorFactory.setTranslatorMapping(translatorMapping);
+ ticketProcessorFactory.setVersionExtractor(versionExtractor);
+ ticketProcessorFactory.setSpy(messageSpy);
+
TicketFinisher<DataObject> finisher = new TicketFinisher<>(
processQueue, popListenersMapping, registeredOutTypeExtractor);
new Thread(finisher).start();
* @param ticket
*/
private void scheduleTicket(Ticket<OfHeader, DataObject> ticket) {
- pool.execute(TicketProcessorFactory.createProcessor(ticket, versionExtractor,
- registeredSrcTypeExtractor, translatorMapping));
+ Runnable ticketProcessor = ticketProcessorFactory.createProcessor(ticket);
+ pool.execute(ticketProcessor);
}
/**
this.popListenersMapping = popListenersMapping;
}
+ /**
+ * @param messageSpy the messageSpy to set
+ */
+ public void setMessageSpy(MessageSpy<OfHeader, DataObject> messageSpy) {
+ this.messageSpy = messageSpy;
+ }
+
private List<DataObject> translate(OfHeader message, ConnectionConductor conductor) {
List<DataObject> result = new ArrayList<>();
Class<? extends OfHeader> messageType = registeredSrcTypeExtractor.extractRegisteredType(message);
result.addAll(translator.translate(cookie, conductor.getSessionContext(), message));
}
}
+ if (messageSpy != null) {
+ messageSpy.spyIn(message);
+ messageSpy.spyOut(result);
+ }
} else {
LOG.warn("No translators for this message Type: {}", messageType);
}
import org.slf4j.LoggerFactory;
/**
- * @author mirehak
- * @param <IN> source type
* @param <OUT> result type
*
*/
import org.slf4j.LoggerFactory;
/**
- * @author mirehak
- *
+ * @param <IN>
+ * @param <OUT>
*/
-public abstract class TicketProcessorFactory {
+public class TicketProcessorFactory<IN, OUT> {
protected static final Logger LOG = LoggerFactory
.getLogger(TicketProcessorFactory.class);
+
+ protected VersionExtractor<IN> versionExtractor;
+ protected RegisteredTypeExtractor<IN> registeredTypeExtractor;
+ protected Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping;
+ protected MessageSpy<IN, OUT> spy;
+
+ /**
+ * @param versionExtractor the versionExtractor to set
+ */
+ public void setVersionExtractor(VersionExtractor<IN> versionExtractor) {
+ this.versionExtractor = versionExtractor;
+ }
+
+ /**
+ * @param registeredTypeExtractor the registeredTypeExtractor to set
+ */
+ public void setRegisteredTypeExtractor(
+ RegisteredTypeExtractor<IN> registeredTypeExtractor) {
+ this.registeredTypeExtractor = registeredTypeExtractor;
+ }
+
+ /**
+ * @param translatorMapping the translatorMapping to set
+ */
+ public void setTranslatorMapping(
+ Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping) {
+ this.translatorMapping = translatorMapping;
+ }
+
+ /**
+ * @param spy the spy to set
+ */
+ public void setSpy(MessageSpy<IN, OUT> spy) {
+ this.spy = spy;
+ }
+
/**
* @param ticket
- * @param versionExtractor
- * @param registeredTypeExtractor
- * @param translatorMapping
* @return runnable ticket processor
*/
- public static <IN, OUT> Runnable createProcessor(
- final Ticket<IN, OUT> ticket,
- final VersionExtractor<IN> versionExtractor,
- final RegisteredTypeExtractor<IN> registeredTypeExtractor,
- final Map<TranslatorKey, Collection<IMDMessageTranslator<IN, List<OUT>>>> translatorMapping) {
- return new Runnable() {
+ public Runnable createProcessor(final Ticket<IN, OUT> ticket) {
+
+ Runnable ticketProcessor = new Runnable() {
@Override
public void run() {
LOG.debug("message received, type: {}", registeredTypeExtractor.extractRegisteredType(
try {
translate = translate();
ticket.getResult().set(translate);
+ // spying on result
+ if (spy != null) {
+ spy.spyIn(ticket.getMessage());
+ spy.spyOut(ticket.getResult().get());
+ }
} catch (Exception e) {
LOG.error("translation problem: {}", e.getMessage());
ticket.getResult().setException(e);
Short version = versionExtractor.extractVersion(message);
if (version == null) {
- throw new IllegalArgumentException("version is NULL");
+ throw new IllegalArgumentException("version is NULL");
}
TranslatorKey tKey = new TranslatorKey(version, messageType.getName());
translators = translatorMapping.get(tKey);
return result;
}
};
+
+ return ticketProcessor;
}
}
popListener = new PopListenerCountingImpl<>();
queueKeeper = new QueueKeeperLightImpl();
- queueKeeper.init();
connectionConductor = new ConnectionConductorImpl(adapter);
connectionConductor.setQueueKeeper(queueKeeper);
controller.getMessageTranslators().putAll(assembleTranslatorMapping());
queueKeeper.setPopListenersMapping(assemblePopListenerMapping());
+ queueKeeper.init();
}
/**
flow.setInstructions(createDropInstructions().build());
break;
case "f54":
- id += 51;
+ id += 54;
flow.setMatch(new MatchBuilder().build());
flow.setInstructions(createSentToControllerInstructions().build());
break;
ci.println("Status of Flow Data Loaded Transaction: " + status);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
} catch (ExecutionException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
}
}
ci.println("Status of Flow Data Loaded Transaction: " + status);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
} catch (ExecutionException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
}
}