*/
package org.opendaylight.openflowplugin.impl.services;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Service;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
+import org.opendaylight.openflowplugin.impl.statistics.StatisticsGatheringUtils;
import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.multipart.types.rev170112.multipart.reply.MultipartReplyBody;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class AbstractMultipartRequestOnTheFlyCallback<T extends OfHeader> extends AbstractMultipartRequestCallback<T> {
+public abstract class AbstractMultipartRequestOnTheFlyCallback<T extends OfHeader>
+ extends AbstractMultipartRequestCallback<T> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractMultipartRequestOnTheFlyCallback.class);
private final DeviceInfo deviceInfo;
- private boolean finished = false;
private final EventIdentifier doneEventIdentifier;
private final TxFacade txFacade;
private final MultipartWriterProvider statisticsWriterProvider;
+ private final DeviceRegistry deviceRegistry;
+ private volatile Service.State gatheringState = Service.State.NEW;
+ private final ConvertorExecutor convertorExecutor;
public AbstractMultipartRequestOnTheFlyCallback(final RequestContext<List<T>> context, Class<?> requestType,
final DeviceContext deviceContext,
final EventIdentifier eventIdentifier,
- final MultipartWriterProvider statisticsWriterProvider) {
+ final MultipartWriterProvider statisticsWriterProvider,
+ final ConvertorExecutor convertorExecutor) {
super(context, requestType, deviceContext, eventIdentifier);
deviceInfo = deviceContext.getDeviceInfo();
- doneEventIdentifier = new EventIdentifier(getMultipartType().name(), deviceContext.getDeviceInfo().getNodeId().toString());
+ doneEventIdentifier =
+ new EventIdentifier(getMultipartType().name(), deviceContext.getDeviceInfo().getNodeId().toString());
txFacade = deviceContext;
+ deviceRegistry = deviceContext;
this.statisticsWriterProvider = statisticsWriterProvider;
+ this.convertorExecutor = convertorExecutor;
}
@Override
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({"unchecked", "checkstyle:IllegalCatch"})
public void onSuccess(final OfHeader result) {
- if (Objects.isNull(result)) {
- LOG.info("OfHeader was null.");
- if (!finished) {
- endCollecting();
- return;
+ if (result == null) {
+ LOG.warn("Response received was null.");
+
+ if (!Service.State.TERMINATED.equals(gatheringState)) {
+ endCollecting(true);
}
- } else if (finished) {
- LOG.debug("Unexpected multipart response received: xid={}, {}", result.getXid(), result.getImplementedInterface());
+
+ return;
+ } else if (Service.State.TERMINATED.equals(gatheringState)) {
+ LOG.warn("Unexpected response received: xid={}, {}", result.getXid(), result.getImplementedInterface());
return;
}
if (!isMultipart(result)) {
- LOG.info("Unexpected response type received {}.", result.getClass());
+ LOG.warn("Unexpected response type received: {}.", result.getClass());
setResult(RpcResultBuilder.<List<T>>failed().withError(RpcError.ErrorType.APPLICATION,
- String.format("Unexpected response type received %s.", result.getClass())).build());
- endCollecting();
+ String.format("Unexpected response type received: %s.", result.getClass())).build());
+ endCollecting(false);
} else {
final T resultCast = (T) result;
- Futures.transform(processStatistics(resultCast), (Function<Optional<? extends MultipartReplyBody>, Void>) input -> {
- input.ifPresent(reply -> {
- try {
- statisticsWriterProvider
- .lookup(getMultipartType())
- .ifPresent(writer -> writer.write(reply, false));
- } catch (final Exception ex) {
- LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step",
- getMultipartType(), deviceInfo.getLOGValue(), ex);
- }
- });
-
- if (!isReqMore(resultCast)) {
- endCollecting();
- }
-
- return null;
- });
+ if (Service.State.NEW.equals(gatheringState)) {
+ startCollecting();
+ }
+
+ try {
+ MultipartReplyTranslatorUtil
+ .translate(resultCast, deviceInfo, convertorExecutor, null)
+ .ifPresent(reply -> {
+ try {
+ statisticsWriterProvider
+ .lookup(getMultipartType())
+ .ifPresent(writer -> writer.write(reply, false));
+ } catch (final Exception ex) {
+ LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step",
+ getMultipartType(), deviceInfo, ex);
+ }
+ });
+ } catch (final Exception ex) {
+ LOG.warn("Unexpected exception occurred while translating response: {}.", result.getClass(), ex);
+ setResult(RpcResultBuilder.<List<T>>failed().withError(RpcError.ErrorType.APPLICATION,
+ String.format("Unexpected exception occurred while translating response: %s. %s",
+ result.getClass(),
+ ex)).build());
+ endCollecting(false);
+ return;
+ }
+
+ if (!isReqMore(resultCast)) {
+ endCollecting(true);
+ }
}
}
/**
- * Get tx facade
+ * Get tx facade.
* @return tx facade
*/
protected TxFacade getTxFacade() {
}
/**
- * Ends collecting of multipart data
+ * Starts collecting of multipart data.
*/
- private void endCollecting() {
- EventsTimeCounter.markEnd(doneEventIdentifier);
- EventsTimeCounter.markEnd(getEventIdentifier());
- spyMessage(MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
- txFacade.submitTransaction();
- setResult(RpcResultBuilder.success(Collections.<T>emptyList()).build());
- finished = true;
+ private synchronized void startCollecting() {
+ EventsTimeCounter.markStart(doneEventIdentifier);
+ gatheringState = Service.State.RUNNING;
+
+ final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
+ .getNodeInstanceIdentifier()
+ .augmentation(FlowCapableNode.class);
+
+ switch (getMultipartType()) {
+ case OFPMPFLOW:
+ StatisticsGatheringUtils.deleteAllKnownFlows(
+ getTxFacade(),
+ instanceIdentifier,
+ deviceRegistry.getDeviceFlowRegistry());
+ deviceRegistry.getDeviceFlowRegistry().processMarks();
+ break;
+ case OFPMPMETERCONFIG:
+ StatisticsGatheringUtils.deleteAllKnownMeters(
+ getTxFacade(),
+ instanceIdentifier,
+ deviceRegistry.getDeviceMeterRegistry());
+ deviceRegistry.getDeviceMeterRegistry().processMarks();
+ break;
+ case OFPMPGROUPDESC:
+ StatisticsGatheringUtils.deleteAllKnownGroups(
+ getTxFacade(),
+ instanceIdentifier,
+ deviceRegistry.getDeviceGroupRegistry());
+ deviceRegistry.getDeviceGroupRegistry().processMarks();
+ break;
+ default:
+ // no operation
+ }
}
/**
- * Process statistics.
- *
- * @param result result
+ * Ends collecting of multipart data.
+ * @param setResult set empty success result
*/
- protected abstract ListenableFuture<Optional<? extends MultipartReplyBody>> processStatistics(final T result);
+ private void endCollecting(final boolean setResult) {
+ gatheringState = Service.State.TERMINATED;
+ EventsTimeCounter.markEnd(doneEventIdentifier);
+ EventsTimeCounter.markEnd(getEventIdentifier());
+ spyMessage(MessageSpy.StatisticsGroup.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
+
+ if (setResult) {
+ setResult(RpcResultBuilder.success(Collections.<T>emptyList()).build());
+ }
+
+ txFacade.submitTransaction();
+
+ switch (getMultipartType()) {
+ case OFPMPFLOW:
+ deviceRegistry.getDeviceFlowRegistry().processMarks();
+ break;
+ case OFPMPMETERCONFIG:
+ deviceRegistry.getDeviceMeterRegistry().processMarks();
+ break;
+ case OFPMPGROUPDESC:
+ deviceRegistry.getDeviceGroupRegistry().processMarks();
+ break;
+ default:
+ // no operation
+ }
+ }
/**
- * Get multipart type
+ * Get multipart type.
* @return multipart type
*/
protected abstract MultipartType getMultipartType();
-
-
}