--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+/**
+ * Created by mirehak on 6/2/15.
+ */
+public interface StatisticsGatherer {
+ Future<RpcResult<List<MultipartReply>>> getStatisticsOfType(EventIdentifier eventIdentifier, MultipartType type);
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.services;
+
+import com.google.common.util.concurrent.FutureCallback;
+import java.util.List;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+
+public abstract class AbstractMultipartOnTheFlyService<I> extends AbstractService<I, List<MultipartReply>> {
+ protected AbstractMultipartOnTheFlyService(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
+ super(requestContextStack, deviceContext);
+ }
+
+ @Override
+ protected final FutureCallback<OfHeader> createCallback(final RequestContext<List<MultipartReply>> context, final Class<?> requestType) {
+ return new MultipartRequestOnTheFlyCallback(context, requestType, getDeviceContext(), getEventIdentifier());
+ }
+
+
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.impl.services;
+
+import com.google.common.collect.Iterables;
+import java.util.Collections;
+import java.util.List;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.impl.statistics.SinglePurposeMultipartReplyTranslator;
+import org.opendaylight.openflowplugin.impl.statistics.StatisticsGatheringUtils;
+import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.DataObject;
+import org.opendaylight.yangtools.yang.common.RpcError;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class MultipartRequestOnTheFlyCallback extends AbstractRequestCallback<List<MultipartReply>> {
+ private static final Logger LOG = LoggerFactory.getLogger(MultipartRequestOnTheFlyCallback.class);
+ private final DeviceContext deviceContext;
+ private static final SinglePurposeMultipartReplyTranslator MULTIPART_REPLY_TRANSLATOR = new SinglePurposeMultipartReplyTranslator();
+ private boolean virgin = true;
+ private boolean finished = false;
+ private final EventIdentifier doneEventIdentifier;
+
+
+
+ public MultipartRequestOnTheFlyCallback(final RequestContext<List<MultipartReply>> context,
+ final Class<?> requestType,
+ final DeviceContext deviceContext,
+ final EventIdentifier eventIdentifier) {
+ super(context, requestType, deviceContext.getMessageSpy(), eventIdentifier);
+ this.deviceContext = deviceContext;
+ //TODO: this is focused on flow stats only - need more general approach if used for more than flow stats
+ doneEventIdentifier = new EventIdentifier(MultipartType.OFPMPFLOW.name(), deviceContext.getPrimaryConnectionContext().getNodeId().toString());
+ }
+
+ public EventIdentifier getDoneEventIdentifier() {
+ return doneEventIdentifier;
+ }
+
+ @Override
+ public void onSuccess(final OfHeader result) {
+ if (result == null) {
+ LOG.info("Ofheader was null.");
+ if (!finished) {
+ endCollecting();
+ return;
+ }
+ } else if (finished) {
+ LOG.debug("Unexpected multipart response received: xid={}, {}", result.getXid(), result.getImplementedInterface());
+ return;
+ }
+
+ if (!(result instanceof MultipartReply)) {
+ LOG.info("Unexpected response type received {}.", result.getClass());
+ final RpcResultBuilder<List<MultipartReply>> rpcResultBuilder =
+ RpcResultBuilder.<List<MultipartReply>>failed().withError(RpcError.ErrorType.APPLICATION,
+ String.format("Unexpected response type received %s.", result.getClass()));
+ setResult(rpcResultBuilder.build());
+ endCollecting();
+ } else {
+ MultipartReply multipartReply = (MultipartReply) result;
+
+ Iterable<? extends DataObject> allMultipartData = Collections.emptyList();
+ final MultipartReply singleReply = multipartReply;
+ final List<? extends DataObject> multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate(deviceContext, singleReply);
+ allMultipartData = Iterables.concat(allMultipartData, multipartDataList);
+
+ //TODO: following part is focused on flow stats only - need more general approach if used for more than flow stats
+ if (virgin) {
+ StatisticsGatheringUtils.deleteAllKnownFlows(deviceContext, deviceContext.getDeviceState().getNodeInstanceIdentifier());
+ virgin = false;
+ }
+ StatisticsGatheringUtils.writeFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext);
+ // ^^^^
+
+ if (!multipartReply.getFlags().isOFPMPFREQMORE()) {
+ endCollecting();
+ }
+ }
+ }
+
+ private void endCollecting() {
+ EventsTimeCounter.markEnd(getDoneEventIdentifier());
+ EventsTimeCounter.markEnd(getEventIdentifier());
+ final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder.success(Collections.<MultipartReply>emptyList()).build();
+ spyMessage(MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
+ setResult(rpcResult);
+ deviceContext.submitTransaction();
+ finished = true;
+ }
+}
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
import org.opendaylight.openflowplugin.impl.services.RequestContextUtil;
+import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringOnTheFlyService;
import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.slf4j.Logger;
private final List<MultipartType> collectingStatType;
private final StatisticsGatheringService statisticsGatheringService;
+ private final StatisticsGatheringOnTheFlyService statisticsGatheringOnTheFlyService;
public StatisticsContextImpl(@CheckForNull final DeviceContext deviceContext) {
this.deviceContext = Preconditions.checkNotNull(deviceContext);
devState = Preconditions.checkNotNull(deviceContext.getDeviceState());
emptyFuture = Futures.immediateFuture(new Boolean(false));
statisticsGatheringService = new StatisticsGatheringService(this, deviceContext);
+ statisticsGatheringOnTheFlyService = new StatisticsGatheringOnTheFlyService(this, deviceContext);
+
final List<MultipartType> statListForCollecting = new ArrayList<>();
if (devState.isTableStatisticsAvailable()) {
statListForCollecting.add(MultipartType.OFPMPTABLE);
return settableStatResultFuture;
}
- private ListenableFuture<Boolean> choiseStat(final MultipartType multipartType) {
+ private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType) {
switch (multipartType) {
case OFPMPFLOW:
return collectFlowStatistics(multipartType);
resultFuture.set(Boolean.TRUE);
return;
}
- final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = choiseStat(iterator.next());
+ final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(iterator.next());
Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(final Boolean result) {
private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType) {
return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
- statisticsGatheringService, deviceContext, /*MultipartType.OFPMPFLOW*/ multipartType) : emptyFuture;
+ statisticsGatheringOnTheFlyService, deviceContext, /*MultipartType.OFPMPFLOW*/ multipartType) : emptyFuture;
}
private ListenableFuture<Boolean> collectTableStatistics(final MultipartType multipartType) {
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
-import org.opendaylight.openflowplugin.impl.statistics.services.dedicated.StatisticsGatheringService;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowId;
}
- public static ListenableFuture<Boolean> gatherStatistics(final StatisticsGatheringService statisticsGatheringService,
+ public static ListenableFuture<Boolean> gatherStatistics(final StatisticsGatherer statisticsGatheringService,
final DeviceContext deviceContext,
final MultipartType type) {
//FIXME : anytype listener must not be send as parameter, it has to be extracted from device context inside service
}
EventIdentifier ofpQueuToRequestContextEventIdentifier = new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceId);
final ListenableFuture<RpcResult<List<MultipartReply>>> statisticsDataInFuture =
- JdkFutureAdapters.listenInPoolThread(statisticsGatheringService.getStatisticsOfType(ofpQueuToRequestContextEventIdentifier, type));
+ JdkFutureAdapters.listenInPoolThread(statisticsGatheringService.getStatisticsOfType(
+ ofpQueuToRequestContextEventIdentifier, type));
return transformAndStoreStatisticsData(statisticsDataInFuture, deviceContext, wholeProcessEventIdentifier);
}
public Boolean apply(final RpcResult<List<MultipartReply>> rpcResult) {
if (rpcResult.isSuccessful()) {
boolean isMultipartProcessed = Boolean.TRUE;
- Iterable<? extends DataObject> allMultipartData = Collections.emptyList();
- DataObject multipartData = null;
- for (final MultipartReply singleReply : rpcResult.getResult()) {
- final List<? extends DataObject> multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate(deviceContext, singleReply);
- multipartData = multipartDataList.get(0);
- allMultipartData = Iterables.concat(allMultipartData, multipartDataList);
- }
- if (multipartData instanceof GroupStatisticsUpdated) {
- processGroupStatistics((Iterable<GroupStatisticsUpdated>) allMultipartData, deviceContext);
- } else if (multipartData instanceof MeterStatisticsUpdated) {
- processMetersStatistics((Iterable<MeterStatisticsUpdated>) allMultipartData, deviceContext);
- } else if (multipartData instanceof NodeConnectorStatisticsUpdate) {
- processNodeConnectorStatistics((Iterable<NodeConnectorStatisticsUpdate>) allMultipartData, deviceContext);
- } else if (multipartData instanceof FlowTableStatisticsUpdate) {
- processFlowTableStatistics((Iterable<FlowTableStatisticsUpdate>) allMultipartData, deviceContext);
- } else if (multipartData instanceof QueueStatisticsUpdate) {
- processQueueStatistics((Iterable<QueueStatisticsUpdate>) allMultipartData, deviceContext);
- } else if (multipartData instanceof FlowsStatisticsUpdate) {
- processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext);
- EventsTimeCounter.markEnd(eventIdentifier);
- } else if (multipartData instanceof GroupDescStatsUpdated) {
- processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceContext);
- } else if (multipartData instanceof MeterConfigStatsUpdated) {
- processMeterConfigStatsUpdated((Iterable<MeterConfigStatsUpdated>) allMultipartData, deviceContext);
- } else {
- isMultipartProcessed = Boolean.FALSE;
+ // TODO: in case the result value is null then multipart data probably got processed on the fly -
+ // TODO: this contract should by clearly stated and enforced - now simple true value is returned
+ if (null != rpcResult.getResult()) {
+ Iterable<? extends DataObject> allMultipartData = Collections.emptyList();
+ DataObject multipartData = null;
+ for (final MultipartReply singleReply : rpcResult.getResult()) {
+ final List<? extends DataObject> multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate(deviceContext, singleReply);
+ multipartData = multipartDataList.get(0);
+ allMultipartData = Iterables.concat(allMultipartData, multipartDataList);
+ }
+
+ if (multipartData instanceof GroupStatisticsUpdated) {
+ processGroupStatistics((Iterable<GroupStatisticsUpdated>) allMultipartData, deviceContext);
+ } else if (multipartData instanceof MeterStatisticsUpdated) {
+ processMetersStatistics((Iterable<MeterStatisticsUpdated>) allMultipartData, deviceContext);
+ } else if (multipartData instanceof NodeConnectorStatisticsUpdate) {
+ processNodeConnectorStatistics((Iterable<NodeConnectorStatisticsUpdate>) allMultipartData, deviceContext);
+ } else if (multipartData instanceof FlowTableStatisticsUpdate) {
+ processFlowTableStatistics((Iterable<FlowTableStatisticsUpdate>) allMultipartData, deviceContext);
+ } else if (multipartData instanceof QueueStatisticsUpdate) {
+ processQueueStatistics((Iterable<QueueStatisticsUpdate>) allMultipartData, deviceContext);
+ } else if (multipartData instanceof FlowsStatisticsUpdate) {
+ processFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData, deviceContext);
+ EventsTimeCounter.markEnd(eventIdentifier);
+ } else if (multipartData instanceof GroupDescStatsUpdated) {
+ processGroupDescStats((Iterable<GroupDescStatsUpdated>) allMultipartData, deviceContext);
+ } else if (multipartData instanceof MeterConfigStatsUpdated) {
+ processMeterConfigStatsUpdated((Iterable<MeterConfigStatsUpdated>) allMultipartData, deviceContext);
+ } else {
+ isMultipartProcessed = Boolean.FALSE;
+ }
+ //TODO : implement experimenter
}
- //TODO : implement experimenter
return isMultipartProcessed;
}
}
private static void processFlowStatistics(final Iterable<FlowsStatisticsUpdate> data, final DeviceContext deviceContext) {
- final NodeId nodeId = deviceContext.getDeviceState().getNodeId();
- final InstanceIdentifier<Node> nodeIdent = InstanceIdentifier.create(Nodes.class)
- .child(Node.class, new NodeKey(nodeId));
- deleteAllKnownFlows(deviceContext, nodeIdent);
+ deleteAllKnownFlows(deviceContext, deviceContext.getDeviceState().getNodeInstanceIdentifier());
+ writeFlowStatistics(data, deviceContext);
+ deviceContext.submitTransaction();
+ }
+
+ public static void writeFlowStatistics(Iterable<FlowsStatisticsUpdate> data, DeviceContext deviceContext) {
for (final FlowsStatisticsUpdate flowsStatistics : data) {
for (final FlowAndStatisticsMapList flowStat : flowsStatistics.getFlowAndStatisticsMapList()) {
final FlowBuilder flowBuilder = new FlowBuilder(flowStat);
deviceContext.writeToTransaction(LogicalDatastoreType.OPERATIONAL, flowIdent, flowBuilder.build());
}
}
- deviceContext.submitTransaction();
}
- private static void deleteAllKnownFlows(final DeviceContext deviceContext, final InstanceIdentifier<Node> nodeIdent) {
+ public static void deleteAllKnownFlows(final DeviceContext deviceContext, final InstanceIdentifier<Node> nodeIdent) {
if (deviceContext.getDeviceState().deviceSynchronized()) {
final Short numOfTablesOnDevice = deviceContext.getDeviceState().getFeatures().getTables();
for (short i = 0; i < numOfTablesOnDevice; i++) {
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.openflowplugin.impl.statistics.services.dedicated;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
+import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
+import org.opendaylight.openflowplugin.api.openflow.device.Xid;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
+import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory;
+import org.opendaylight.openflowplugin.impl.services.AbstractMultipartOnTheFlyService;
+import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+/**
+ * collects statistics and processes them on the fly
+ */
+public class StatisticsGatheringOnTheFlyService extends AbstractMultipartOnTheFlyService<MultipartType> implements StatisticsGatherer {
+
+ public StatisticsGatheringOnTheFlyService(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
+ super(requestContextStack, deviceContext);
+ }
+
+ @Override
+ public Future<RpcResult<List<MultipartReply>>> getStatisticsOfType(final EventIdentifier eventIdentifier, final MultipartType type) {
+ EventsTimeCounter.markStart(eventIdentifier);
+ setEventIdentifier(eventIdentifier);
+ return handleServiceCall(type);
+ }
+
+ @Override
+ protected OfHeader buildRequest(final Xid xid, final MultipartType input) {
+ return MultipartRequestInputFactory.makeMultipartRequestInput(xid.getValue(), getVersion(), input);
+ }
+}
import org.opendaylight.openflowplugin.api.openflow.device.RequestContextStack;
import org.opendaylight.openflowplugin.api.openflow.device.Xid;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
+import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
import org.opendaylight.openflowplugin.impl.common.MultipartRequestInputFactory;
import org.opendaylight.openflowplugin.impl.services.AbstractMultipartService;
import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
/**
* Created by Martin Bobak <mbobak@cisco.com> on 4.4.2015.
*/
-public class StatisticsGatheringService extends AbstractMultipartService<MultipartType> {
+public class StatisticsGatheringService extends AbstractMultipartService<MultipartType> implements StatisticsGatherer {
public StatisticsGatheringService(final RequestContextStack requestContextStack, final DeviceContext deviceContext) {
super(requestContextStack, deviceContext);
}
+ @Override
public Future<RpcResult<List<MultipartReply>>> getStatisticsOfType(final EventIdentifier eventIdentifier, final MultipartType type) {
EventsTimeCounter.markStart(eventIdentifier);
setEventIdentifier(eventIdentifier);