*/
package org.opendaylight.openflowplugin.impl.services;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.base.Optional;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
import org.opendaylight.openflowplugin.impl.statistics.SinglePurposeMultipartReplyTranslator;
import org.opendaylight.openflowplugin.impl.statistics.StatisticsGatheringUtils;
import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
final class MultipartRequestOnTheFlyCallback extends AbstractRequestCallback<List<MultipartReply>> {
private static final Logger LOG = LoggerFactory.getLogger(MultipartRequestOnTheFlyCallback.class);
- private static final SinglePurposeMultipartReplyTranslator MULTIPART_REPLY_TRANSLATOR = new SinglePurposeMultipartReplyTranslator();
+ private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator;
private final DeviceInfo deviceInfo;
private final DeviceFlowRegistry registry;
- private boolean virgin = true;
- private boolean finished = false;
private final EventIdentifier doneEventIdentifier;
private final TxFacade txFacade;
+ private Optional<FlowCapableNode> fcNodeOpt;
+ private AtomicBoolean virgin = new AtomicBoolean(true);
+ private AtomicBoolean finished = new AtomicBoolean(false);
public MultipartRequestOnTheFlyCallback(final RequestContext<List<MultipartReply>> context,
final Class<?> requestType,
final EventIdentifier eventIdentifier,
final DeviceInfo deviceInfo,
final DeviceFlowRegistry registry,
- final TxFacade txFacade) {
+ final TxFacade txFacade,
+ final ConvertorExecutor convertorExecutor) {
super(context, requestType, messageSpy, eventIdentifier);
this.deviceInfo = deviceInfo;
this.registry = registry;
this.txFacade = txFacade;
+ multipartReplyTranslator = new SinglePurposeMultipartReplyTranslator(convertorExecutor);
+
//TODO: this is focused on flow stats only - need more general approach if used for more than flow stats
doneEventIdentifier = new EventIdentifier(MultipartType.OFPMPFLOW.name(), deviceInfo.getNodeId().toString());
}
@Override
public void onSuccess(final OfHeader result) {
+
if (result == null) {
LOG.info("Ofheader was null.");
- if (!finished) {
+ if (!finished.getAndSet(true)) {
endCollecting();
return;
}
- } else if (finished) {
+ } else if (finished.get()) {
LOG.debug("Unexpected multipart response received: xid={}, {}", result.getXid(), result.getImplementedInterface());
return;
}
if (!(result instanceof MultipartReply)) {
- LOG.info("Unexpected response type received {}.", result.getClass());
- final RpcResultBuilder<List<MultipartReply>> rpcResultBuilder =
- RpcResultBuilder.<List<MultipartReply>>failed().withError(RpcError.ErrorType.APPLICATION,
- String.format("Unexpected response type received %s.", result.getClass()));
- setResult(rpcResultBuilder.build());
- endCollecting();
+ if(!finished.getAndSet(true)) {
+ LOG.info("Unexpected response type received {}.", result.getClass());
+ final RpcResultBuilder<List<MultipartReply>> rpcResultBuilder =
+ RpcResultBuilder.<List<MultipartReply>>failed().withError(RpcError.ErrorType.APPLICATION,
+ String.format("Unexpected response type received %s.", result.getClass()));
+ setResult(rpcResultBuilder.build());
+ endCollecting();
+ }
} else {
final MultipartReply multipartReply = (MultipartReply) result;
+ if (virgin.get()) {
+ synchronized (this) {
+ if (virgin.get()) {
+ fcNodeOpt = StatisticsGatheringUtils.deleteAllKnownFlows(deviceInfo, txFacade);
+ virgin.set(false);
+ }
+ }
+ }
final MultipartReply singleReply = multipartReply;
- final List<? extends DataObject> multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate(
+ final List<? extends DataObject> multipartDataList = multipartReplyTranslator.translate(
deviceInfo.getDatapathId(), deviceInfo.getVersion(), singleReply);
- final Iterable<? extends DataObject> allMultipartData = multipartDataList;
+ final Iterable<FlowsStatisticsUpdate> allMultipartData = (Iterable<FlowsStatisticsUpdate>) multipartDataList;
- //TODO: following part is focused on flow stats only - need more general approach if used for more than flow stats
- ListenableFuture<Void> future;
- if (virgin) {
- future = StatisticsGatheringUtils.deleteAllKnownFlows(deviceInfo, registry, txFacade);
- virgin = false;
- } else {
- future = Futures.immediateFuture(null);
+ StatisticsGatheringUtils.writeFlowStatistics(allMultipartData, deviceInfo, registry, txFacade);
+ if (!multipartReply.getFlags().isOFPMPFREQMORE()) {
+ endCollecting();
}
-
- Futures.transform(future, new Function<Void, Void>() {
-
- @Override
- public Void apply(final Void input) {
- StatisticsGatheringUtils.writeFlowStatistics((Iterable<FlowsStatisticsUpdate>) allMultipartData,
- deviceInfo, registry, txFacade);
-
- if (!multipartReply.getFlags().isOFPMPFREQMORE()) {
- endCollecting();
- }
- return input;
- }
- });
}
}
private void endCollecting() {
+ finished.set(true);
EventsTimeCounter.markEnd(getDoneEventIdentifier());
EventsTimeCounter.markEnd(getEventIdentifier());
final RpcResult<List<MultipartReply>> rpcResult = RpcResultBuilder.success(Collections.<MultipartReply>emptyList()).build();
spyMessage(MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
- txFacade.submitTransaction();
setResult(rpcResult);
- finished = true;
+ txFacade.submitTransaction();
}
}