X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflowplugin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fimpl%2Fservices%2FMultipartRequestOnTheFlyCallback.java;h=da8d74275dd45956d9b493e303366984211b7320;hb=d1af0fd5a4053a10917f631bae42970c1960fd20;hp=dcb445b11e03db4717fcc91d1ab4f5b865730695;hpb=82cd91207e0dfbea3bba59537e7ff9dcaf75d895;p=openflowplugin.git diff --git a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/MultipartRequestOnTheFlyCallback.java b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/MultipartRequestOnTheFlyCallback.java index dcb445b11e..da8d74275d 100644 --- a/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/MultipartRequestOnTheFlyCallback.java +++ b/openflowplugin-impl/src/main/java/org/opendaylight/openflowplugin/impl/services/MultipartRequestOnTheFlyCallback.java @@ -7,11 +7,7 @@ */ package org.opendaylight.openflowplugin.impl.services; -import com.google.common.base.Function; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import java.util.Collections; -import java.util.List; +import com.google.common.base.Optional; import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo; import org.opendaylight.openflowplugin.api.openflow.device.RequestContext; import org.opendaylight.openflowplugin.api.openflow.device.TxFacade; @@ -21,6 +17,8 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.Messa import org.opendaylight.openflowplugin.impl.statistics.SinglePurposeMultipartReplyTranslator; import org.opendaylight.openflowplugin.impl.statistics.StatisticsGatheringUtils; import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter; +import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor; +import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.FlowsStatisticsUpdate; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply; @@ -32,16 +30,21 @@ import org.opendaylight.yangtools.yang.common.RpcResultBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + final class MultipartRequestOnTheFlyCallback extends AbstractRequestCallback> { private static final Logger LOG = LoggerFactory.getLogger(MultipartRequestOnTheFlyCallback.class); - private static final SinglePurposeMultipartReplyTranslator MULTIPART_REPLY_TRANSLATOR = new SinglePurposeMultipartReplyTranslator(); + private final SinglePurposeMultipartReplyTranslator multipartReplyTranslator; private final DeviceInfo deviceInfo; private final DeviceFlowRegistry registry; - private boolean virgin = true; - private boolean finished = false; private final EventIdentifier doneEventIdentifier; private final TxFacade txFacade; + private Optional fcNodeOpt; + private AtomicBoolean virgin = new AtomicBoolean(true); + private AtomicBoolean finished = new AtomicBoolean(false); public MultipartRequestOnTheFlyCallback(final RequestContext> context, final Class requestType, @@ -49,13 +52,16 @@ final class MultipartRequestOnTheFlyCallback extends AbstractRequestCallback> rpcResultBuilder = - RpcResultBuilder.>failed().withError(RpcError.ErrorType.APPLICATION, - String.format("Unexpected response type received %s.", result.getClass())); - setResult(rpcResultBuilder.build()); - endCollecting(); + if(!finished.getAndSet(true)) { + LOG.info("Unexpected response type received {}.", result.getClass()); + final RpcResultBuilder> rpcResultBuilder = + RpcResultBuilder.>failed().withError(RpcError.ErrorType.APPLICATION, + String.format("Unexpected response type received %s.", result.getClass())); + setResult(rpcResultBuilder.build()); + endCollecting(); + } } else { final MultipartReply multipartReply = (MultipartReply) result; + if (virgin.get()) { + synchronized (this) { + if (virgin.get()) { + fcNodeOpt = StatisticsGatheringUtils.deleteAllKnownFlows(deviceInfo, txFacade); + virgin.set(false); + } + } + } final MultipartReply singleReply = multipartReply; - final List multipartDataList = MULTIPART_REPLY_TRANSLATOR.translate( + final List multipartDataList = multipartReplyTranslator.translate( deviceInfo.getDatapathId(), deviceInfo.getVersion(), singleReply); - final Iterable allMultipartData = multipartDataList; + final Iterable allMultipartData = (Iterable) multipartDataList; - //TODO: following part is focused on flow stats only - need more general approach if used for more than flow stats - ListenableFuture future; - if (virgin) { - future = StatisticsGatheringUtils.deleteAllKnownFlows(deviceInfo, registry, txFacade); - virgin = false; - } else { - future = Futures.immediateFuture(null); + StatisticsGatheringUtils.writeFlowStatistics(allMultipartData, deviceInfo, registry, txFacade); + if (!multipartReply.getFlags().isOFPMPFREQMORE()) { + endCollecting(); } - - Futures.transform(future, new Function() { - - @Override - public Void apply(final Void input) { - StatisticsGatheringUtils.writeFlowStatistics((Iterable) allMultipartData, - deviceInfo, registry, txFacade); - - if (!multipartReply.getFlags().isOFPMPFREQMORE()) { - endCollecting(); - } - return input; - } - }); } } private void endCollecting() { + finished.set(true); EventsTimeCounter.markEnd(getDoneEventIdentifier()); EventsTimeCounter.markEnd(getEventIdentifier()); final RpcResult> rpcResult = RpcResultBuilder.success(Collections.emptyList()).build(); spyMessage(MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS); - txFacade.submitTransaction(); setResult(rpcResult); - finished = true; + txFacade.submitTransaction(); } }