* @return readOnlyTransaction - Don't forget to close it after finish reading
*/
ReadOnlyTransaction getReadTransaction();
+
+ /**
+ * Method returns true if transaction chain manager is enabled
+ * @return is transaction chain manager enabled
+ */
+ boolean isTransactionsEnabled();
}
*/
ListenableFuture<Boolean> gatherDynamicData();
- /**
- * Initial data gathering
- * @return true if gathering was successful
- */
- ListenableFuture<Boolean> initialGatherDynamicData();
-
/**
* Method has to be called from DeviceInitialization Method, otherwise
* we are not able to poll anything. Statistics Context normally initialize
private final DeviceInitializerProvider deviceInitializerProvider;
private final boolean useSingleLayerSerialization;
private boolean hasState;
+ private boolean isInitialTransactionSubmitted;
DeviceContextImpl(
@Nonnull final ConnectionContext primaryConnectionContext,
@Override
public boolean initialSubmitTransaction() {
- return (initialized && transactionChainManager.initialSubmitWriteTransaction());
+ return (initialized &&(isInitialTransactionSubmitted =
+ transactionChainManager.initialSubmitWriteTransaction()));
}
@Override
return dataBroker.newReadOnlyTransaction();
}
+ @Override
+ public boolean isTransactionsEnabled() {
+ return isInitialTransactionSubmitted;
+ }
+
@Override
public <T extends DataObject> void writeToTransaction(final LogicalDatastoreType store,
final InstanceIdentifier<T> path,
}
if (!isMultipart(result)) {
- LOG.info("Unexpected response type received {}.", result.getClass());
+ LOG.info("Unexpected response type received: {}.", result.getClass());
setResult(RpcResultBuilder
.<List<T>>failed()
.withError(RpcError.ErrorType.APPLICATION,
- String.format("Unexpected response type received %s.", result.getClass()))
+ String.format("Unexpected response type received: %s.", result.getClass()))
.build());
} else {
final T resultCast = (T) result;
*/
package org.opendaylight.openflowplugin.impl.services;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.Optional;
+import org.opendaylight.openflowplugin.api.openflow.OFPContext.CONTEXT_STATE;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
+import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
+import org.opendaylight.openflowplugin.impl.statistics.StatisticsGatheringUtils;
import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.multipart.types.rev170112.multipart.reply.MultipartReplyBody;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
+import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
public abstract class AbstractMultipartRequestOnTheFlyCallback<T extends OfHeader> extends AbstractMultipartRequestCallback<T> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractMultipartRequestOnTheFlyCallback.class);
private final DeviceInfo deviceInfo;
- private boolean finished = false;
private final EventIdentifier doneEventIdentifier;
private final TxFacade txFacade;
private final MultipartWriterProvider statisticsWriterProvider;
+ private final DeviceRegistry deviceRegistry;
+ private volatile CONTEXT_STATE gatheringState = CONTEXT_STATE.INITIALIZATION;
+ private ConvertorExecutor convertorExecutor;
public AbstractMultipartRequestOnTheFlyCallback(final RequestContext<List<T>> context, Class<?> requestType,
final DeviceContext deviceContext,
final EventIdentifier eventIdentifier,
- final MultipartWriterProvider statisticsWriterProvider) {
+ final MultipartWriterProvider statisticsWriterProvider,
+ final ConvertorExecutor convertorExecutor) {
super(context, requestType, deviceContext, eventIdentifier);
deviceInfo = deviceContext.getDeviceInfo();
doneEventIdentifier = new EventIdentifier(getMultipartType().name(), deviceContext.getDeviceInfo().getNodeId().toString());
txFacade = deviceContext;
+ deviceRegistry = deviceContext;
this.statisticsWriterProvider = statisticsWriterProvider;
+ this.convertorExecutor = convertorExecutor;
}
@Override
@SuppressWarnings("unchecked")
public void onSuccess(final OfHeader result) {
if (Objects.isNull(result)) {
- LOG.info("OfHeader was null.");
- if (!finished) {
- endCollecting();
- return;
+ LOG.warn("Response received was null.");
+
+ if (!CONTEXT_STATE.TERMINATION.equals(gatheringState)) {
+ endCollecting(true);
}
- } else if (finished) {
- LOG.debug("Unexpected multipart response received: xid={}, {}", result.getXid(), result.getImplementedInterface());
+
+ return;
+ } else if (CONTEXT_STATE.TERMINATION.equals(gatheringState)) {
+ LOG.warn("Unexpected response received: xid={}, {}", result.getXid(), result.getImplementedInterface());
return;
}
if (!isMultipart(result)) {
- LOG.info("Unexpected response type received {}.", result.getClass());
+ LOG.warn("Unexpected response type received: {}.", result.getClass());
setResult(RpcResultBuilder.<List<T>>failed().withError(RpcError.ErrorType.APPLICATION,
- String.format("Unexpected response type received %s.", result.getClass())).build());
- endCollecting();
+ String.format("Unexpected response type received: %s.", result.getClass())).build());
+ endCollecting(false);
} else {
final T resultCast = (T) result;
- Futures.transform(processStatistics(resultCast), (Function<Optional<? extends MultipartReplyBody>, Void>) input -> {
- Optional.ofNullable(input).flatMap(i -> i).ifPresent(reply -> {
- try {
- statisticsWriterProvider
- .lookup(getMultipartType())
- .ifPresent(writer -> writer.write(reply, false));
- } catch (final Exception ex) {
- LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step",
- getMultipartType(), deviceInfo.getLOGValue(), ex);
- }
- });
-
- if (!isReqMore(resultCast)) {
- endCollecting();
- onFinishedCollecting();
- }
-
- return null;
- });
+ if (CONTEXT_STATE.INITIALIZATION.equals(gatheringState)) {
+ startCollecting();
+ }
+
+ try {
+ MultipartReplyTranslatorUtil
+ .translate(resultCast, deviceInfo, convertorExecutor, null)
+ .ifPresent(reply -> {
+ try {
+ statisticsWriterProvider
+ .lookup(getMultipartType())
+ .ifPresent(writer -> writer.write(reply, false));
+ } catch (final Exception ex) {
+ LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step",
+ getMultipartType(), deviceInfo.getLOGValue(), ex);
+ }
+ });
+ } catch (final Exception ex) {
+ LOG.warn("Unexpected exception occurred while translating response: {}.", result.getClass(), ex);
+ setResult(RpcResultBuilder.<List<T>>failed().withError(RpcError.ErrorType.APPLICATION,
+ String.format("Unexpected exception occurred while translating response: %s. %s", result.getClass(), ex)).build());
+ endCollecting(false);
+ return;
+ }
+
+ if (!isReqMore(resultCast)) {
+ endCollecting(true);
+ }
}
}
return txFacade;
}
+ /**
+ * Starts collecting of multipart data
+ */
+ private synchronized void startCollecting() {
+ EventsTimeCounter.markStart(doneEventIdentifier);
+ gatheringState = CONTEXT_STATE.WORKING;
+
+ final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
+ .getNodeInstanceIdentifier()
+ .augmentation(FlowCapableNode.class);
+
+ switch (getMultipartType()) {
+ case OFPMPFLOW:
+ StatisticsGatheringUtils.deleteAllKnownFlows(
+ getTxFacade(),
+ instanceIdentifier,
+ deviceRegistry.getDeviceFlowRegistry());
+ break;
+ case OFPMPMETERCONFIG:
+ StatisticsGatheringUtils.deleteAllKnownMeters(
+ getTxFacade(),
+ instanceIdentifier,
+ deviceRegistry.getDeviceMeterRegistry());
+ break;
+ case OFPMPGROUPDESC:
+ StatisticsGatheringUtils.deleteAllKnownGroups(
+ getTxFacade(),
+ instanceIdentifier,
+ deviceRegistry.getDeviceGroupRegistry());
+ break;
+ }
+ }
+
/**
* Ends collecting of multipart data
+ * @param setResult set empty success result
*/
- private void endCollecting() {
+ private void endCollecting(final boolean setResult) {
+ gatheringState = CONTEXT_STATE.TERMINATION;
EventsTimeCounter.markEnd(doneEventIdentifier);
EventsTimeCounter.markEnd(getEventIdentifier());
spyMessage(MessageSpy.STATISTIC_GROUP.FROM_SWITCH_TRANSLATE_OUT_SUCCESS);
+
+ if (setResult) {
+ setResult(RpcResultBuilder.success(Collections.<T>emptyList()).build());
+ }
+
txFacade.submitTransaction();
- setResult(RpcResultBuilder.success(Collections.<T>emptyList()).build());
- finished = true;
- }
- /**
- * Process statistics.
- *
- * @param result result
- */
- protected abstract ListenableFuture<Optional<? extends MultipartReplyBody>> processStatistics(final T result);
+ switch (getMultipartType()) {
+ case OFPMPFLOW:
+ deviceRegistry.getDeviceFlowRegistry().processMarks();
+ break;
+ case OFPMPMETERCONFIG:
+ deviceRegistry.getDeviceMeterRegistry().processMarks();
+ break;
+ case OFPMPGROUPDESC:
+ deviceRegistry.getDeviceGroupRegistry().processMarks();
+ break;
+ }
+ }
/**
* Get multipart type
*/
protected abstract MultipartType getMultipartType();
- /**
- * On finished collection event
- */
- protected abstract void onFinishedCollecting();
-
-
}
private final MessageSpy spy;
private EventIdentifier eventIdentifier;
- protected AbstractRequestCallback(final RequestContext<T> context,
- final Class<?> requestType,
- final MessageSpy spy,
- final EventIdentifier eventIdentifier) {
+ AbstractRequestCallback(final RequestContext<T> context,
+ final Class<?> requestType,
+ final MessageSpy spy,
+ final EventIdentifier eventIdentifier) {
this.context = Preconditions.checkNotNull(context);
this.requestType = Preconditions.checkNotNull(requestType);
this.spy = Preconditions.checkNotNull(spy);
package org.opendaylight.openflowplugin.impl.services.multilayer;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
-import java.util.Optional;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
-import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
import org.opendaylight.openflowplugin.impl.services.AbstractMultipartRequestOnTheFlyCallback;
-import org.opendaylight.openflowplugin.impl.statistics.StatisticsGatheringUtils;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.multipart.types.rev170112.multipart.reply.MultipartReplyBody;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReply;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
public class MultiLayerFlowMultipartRequestOnTheFlyCallback<T extends OfHeader> extends AbstractMultipartRequestOnTheFlyCallback<T> {
- private final ConvertorExecutor convertorExecutor;
- private final DeviceInfo deviceInfo;
- private final DeviceFlowRegistry deviceFlowRegistry;
- private boolean virgin = true;
-
public MultiLayerFlowMultipartRequestOnTheFlyCallback(final RequestContext<List<T>> context,
final Class<?> requestType,
final DeviceContext deviceContext,
final EventIdentifier eventIdentifier,
final MultipartWriterProvider statisticsWriterProvider,
final ConvertorExecutor convertorExecutor) {
- super(context, requestType, deviceContext, eventIdentifier, statisticsWriterProvider);
- this.convertorExecutor = convertorExecutor;
- deviceInfo = deviceContext.getDeviceInfo();
- deviceFlowRegistry = deviceContext.getDeviceFlowRegistry();
+ super(context, requestType, deviceContext, eventIdentifier, statisticsWriterProvider, convertorExecutor);
}
@Override
return MultipartType.OFPMPFLOW;
}
- @Override
- protected void onFinishedCollecting() {
- deviceFlowRegistry.processMarks();
- }
-
- @Override
- protected ListenableFuture<Optional<? extends MultipartReplyBody>> processStatistics(T result) {
- final ListenableFuture<Optional<? extends MultipartReplyBody>> future = Futures.transform(
- StatisticsGatheringUtils.deleteAllKnownFlows(
- getTxFacade(),
- deviceInfo
- .getNodeInstanceIdentifier()
- .augmentation(FlowCapableNode.class),
- !virgin),
- (Function<Void, Optional<? extends MultipartReplyBody>>) input -> MultipartReplyTranslatorUtil
- .translate(result, deviceInfo, convertorExecutor, null));
-
- if (virgin) {
- virgin = false;
- }
-
- return future;
- }
-
}
@VisibleForTesting
private static KeyedInstanceIdentifier<Flow, FlowKey> createFlowPath(FlowDescriptor flowDescriptor,
- KeyedInstanceIdentifier<Node, NodeKey> nodePath) {
+ KeyedInstanceIdentifier<Node, NodeKey> nodePath) {
return nodePath.augmentation(FlowCapableNode.class)
.child(Table.class, flowDescriptor.getTableKey())
.child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
final FlowDescriptor updatedFlowDescriptor;
if (Objects.nonNull(input.getFlowRef())) {
- updatedFlowDescriptor = FlowDescriptorFactory.create(updated.getTableId(), input.getFlowRef().getValue().firstKeyOf(Flow.class).getId());
+ updatedFlowDescriptor = FlowDescriptorFactory.create(updated.getTableId(), input.getFlowRef().getValue().firstKeyOf(Flow.class).getId());
} else {
if (isUpdate) {
updatedFlowDescriptor = origFlowDescriptor;
package org.opendaylight.openflowplugin.impl.services.singlelayer;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
import java.util.List;
-import java.util.Optional;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceContext;
-import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.RequestContext;
-import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
-import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
import org.opendaylight.openflowplugin.impl.services.AbstractMultipartRequestOnTheFlyCallback;
-import org.opendaylight.openflowplugin.impl.statistics.StatisticsGatheringUtils;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.statistics.rev130819.multipart.reply.multipart.reply.body.MultipartReplyFlowStats;
import org.opendaylight.yang.gen.v1.urn.opendaylight.multipart.types.rev170112.MultipartReply;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.multipart.types.rev170112.multipart.reply.MultipartReplyBody;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.MultipartType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
public class SingleLayerFlowMultipartRequestOnTheFlyCallback<T extends OfHeader> extends AbstractMultipartRequestOnTheFlyCallback<T> {
- private final DeviceInfo deviceInfo;
- private final DeviceFlowRegistry deviceFlowRegistry;
- private boolean virgin = true;
-
- public SingleLayerFlowMultipartRequestOnTheFlyCallback(final RequestContext<List<T>> context, Class<?> requestType,
+ public SingleLayerFlowMultipartRequestOnTheFlyCallback(final RequestContext<List<T>> context,
+ final Class<?> requestType,
final DeviceContext deviceContext,
final EventIdentifier eventIdentifier,
final MultipartWriterProvider statisticsWriterProvider) {
- super(context, requestType, deviceContext, eventIdentifier, statisticsWriterProvider);
- deviceInfo = deviceContext.getDeviceInfo();
- deviceFlowRegistry = deviceContext.getDeviceFlowRegistry();
+ super(context, requestType, deviceContext, eventIdentifier, statisticsWriterProvider, null);
}
@Override
return MultipartType.OFPMPFLOW;
}
- @Override
- protected void onFinishedCollecting() {
- deviceFlowRegistry.processMarks();
- }
-
- @Override
- protected ListenableFuture<Optional<? extends MultipartReplyBody>> processStatistics(T result) {
- final ListenableFuture<Optional<? extends MultipartReplyBody>> future = Futures.transform(
- StatisticsGatheringUtils.deleteAllKnownFlows(
- getTxFacade(),
- deviceInfo
- .getNodeInstanceIdentifier()
- .augmentation(FlowCapableNode.class),
- !virgin),
- (Function<Void, Optional<? extends MultipartReplyBody>>) input -> MultipartReplyTranslatorUtil
- .translate(result, deviceInfo, null, null));
-
- if (virgin) {
- virgin = false;
- }
-
- return future;
- }
-
}
}
}
-
- @Override
- public ListenableFuture<Boolean> initialGatherDynamicData() {
- return gatherDynamicData(true);
- }
-
@Override
public ListenableFuture<Boolean> gatherDynamicData(){
- return gatherDynamicData(false);
- }
-
- private ListenableFuture<Boolean> gatherDynamicData(final boolean initial) {
this.lastDataGathering = null;
if (!isStatisticsPollingOn) {
LOG.debug("Statistics for device {} is not enabled.", getDeviceInfo().getNodeId().getValue());
// write start timestamp to state snapshot container
StatisticsGatheringUtils.markDeviceStateSnapshotStart(deviceContext);
- statChainFuture(statIterator, settableStatResultFuture, initial);
+ statChainFuture(statIterator, settableStatResultFuture);
// write end timestamp to state snapshot container
Futures.addCallback(settableStatResultFuture, new FutureCallback<Boolean>() {
}
}
- private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType, final boolean initial){
+ private ListenableFuture<Boolean> chooseStat(final MultipartType multipartType){
ListenableFuture<Boolean> result = Futures.immediateCheckedFuture(Boolean.TRUE);
switch (multipartType) {
case OFPMPFLOW:
- result = collectFlowStatistics(multipartType, initial);
+ result = collectFlowStatistics(multipartType);
break;
case OFPMPTABLE:
result = collectTableStatistics(multipartType);
return Optional.ofNullable(pollTimeout);
}
- private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture, final boolean initial) {
+ private void statChainFuture(final Iterator<MultipartType> iterator, final SettableFuture<Boolean> resultFuture) {
if (ConnectionContext.CONNECTION_STATE.RIP.equals(deviceContext.getPrimaryConnectionContext().getConnectionState())) {
final String errMsg = String.format("Device connection is closed for Node : %s.",
getDeviceInfo().getNodeId());
final MultipartType nextType = iterator.next();
LOG.debug("Stats iterating to next type for node {} of type {}", getDeviceInfo().getLOGValue(), nextType);
- final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType, initial);
+ final ListenableFuture<Boolean> deviceStatisticsCollectionFuture = chooseStat(nextType);
Futures.addCallback(deviceStatisticsCollectionFuture, new FutureCallback<Boolean>() {
@Override
public void onSuccess(final Boolean result) {
- statChainFuture(iterator, resultFuture, initial);
+ statChainFuture(iterator, resultFuture);
}
@Override
public void onFailure(@Nonnull final Throwable t) {
}
//TODO: Refactor twice sending deviceContext into gatheringStatistics
- private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType, final boolean initial) {
+ private ListenableFuture<Boolean> collectFlowStatistics(final MultipartType multipartType) {
return devState.isFlowStatisticsAvailable() ? StatisticsGatheringUtils.gatherStatistics(
statisticsGatheringOnTheFlyService,
getDeviceInfo(),
/*MultipartType.OFPMPFLOW*/ multipartType,
deviceContext,
deviceContext,
- initial,
convertorExecutor,
statisticsWriterProvider) : emptyFuture;
}
/*MultipartType.OFPMPTABLE*/ multipartType,
deviceContext,
deviceContext,
- false,
convertorExecutor,
statisticsWriterProvider) : emptyFuture;
}
/*MultipartType.OFPMPPORTSTATS*/ multipartType,
deviceContext,
deviceContext,
- false,
convertorExecutor,
statisticsWriterProvider) : emptyFuture;
}
/*MultipartType.OFPMPQUEUE*/ multipartType,
deviceContext,
deviceContext,
- false,
convertorExecutor,
statisticsWriterProvider);
}
/*MultipartType.OFPMPGROUPDESC*/ multipartType,
deviceContext,
deviceContext,
- false,
convertorExecutor,
statisticsWriterProvider) : emptyFuture;
}
/*MultipartType.OFPMPGROUP*/ multipartType,
deviceContext,
deviceContext,
- false,
convertorExecutor,
statisticsWriterProvider) : emptyFuture;
}
/*MultipartType.OFPMPMETERCONFIG*/ multipartType,
deviceContext,
deviceContext,
- false,
convertorExecutor,
statisticsWriterProvider) : emptyFuture;
}
/*MultipartType.OFPMPMETER*/ multipartType,
deviceContext,
deviceContext,
- false,
convertorExecutor,
statisticsWriterProvider) : emptyFuture;
}
LOG.info("Starting statistics context cluster services for node {}", deviceInfo.getLOGValue());
this.statListForCollectingInitialization();
- Futures.addCallback(this.initialGatherDynamicData(), new FutureCallback<Boolean>() {
+
+ Futures.addCallback(this.gatherDynamicData(), new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean aBoolean) {
import java.util.Date;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceInfo;
import org.opendaylight.openflowplugin.api.openflow.device.DeviceRegistry;
import org.opendaylight.openflowplugin.api.openflow.device.TxFacade;
+import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.EventIdentifier;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.StatisticsGatherer;
import org.opendaylight.openflowplugin.impl.common.MultipartReplyTranslatorUtil;
import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
-import org.opendaylight.openflowplugin.impl.statistics.ofpspecific.EventsTimeCounter;
import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorExecutor;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.yang.types.rev130715.DateAndTime;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
final MultipartType type,
final TxFacade txFacade,
final DeviceRegistry registry,
- final Boolean initial,
final ConvertorExecutor convertorExecutor,
final MultipartWriterProvider statisticsWriterProvider) {
-
- final EventIdentifier eventIdentifier;
- if (MultipartType.OFPMPFLOW.equals(type)) {
- eventIdentifier = new EventIdentifier(type.toString(), deviceInfo.getNodeId().getValue());
- EventsTimeCounter.markStart(eventIdentifier);
- } else {
- eventIdentifier = null;
- }
-
return Futures.transform(
statisticsGatheringService.getStatisticsOfType(
new EventIdentifier(QUEUE2_REQCTX + type.toString(), deviceInfo.getNodeId().toString()),
}
try {
- return processStatistics(type, allMultipartData, txFacade, registry, deviceInfo,
- statisticsWriterProvider,
- eventIdentifier, initial);
+ return Futures.immediateFuture(processStatistics(
+ type,
+ allMultipartData,
+ txFacade,
+ registry,
+ deviceInfo,
+ statisticsWriterProvider));
} catch (final Exception e) {
LOG.warn("Stats processing of type {} for node {} failed during processing step",
type, deviceInfo.getNodeId(), e);
});
}
- private static ListenableFuture<Boolean> processStatistics(final MultipartType type,
- final List<? extends DataContainer> statistics,
- final TxFacade txFacade,
- final DeviceRegistry deviceRegistry,
- final DeviceInfo deviceInfo,
- final MultipartWriterProvider statisticsWriterProvider,
- final EventIdentifier eventIdentifier,
- final boolean initial) {
-
- ListenableFuture<Void> future = Futures.immediateFuture(null);
-
+ private static boolean processStatistics(final MultipartType type,
+ final List<? extends DataContainer> statistics,
+ final TxFacade txFacade,
+ final DeviceRegistry deviceRegistry,
+ final DeviceInfo deviceInfo,
+ final MultipartWriterProvider statisticsWriterProvider) {
final InstanceIdentifier<FlowCapableNode> instanceIdentifier = deviceInfo
.getNodeInstanceIdentifier()
.augmentation(FlowCapableNode.class);
switch (type) {
case OFPMPFLOW:
- future = deleteAllKnownFlows(txFacade, instanceIdentifier, initial);
+ deleteAllKnownFlows(txFacade, instanceIdentifier, deviceRegistry.getDeviceFlowRegistry());
break;
case OFPMPMETERCONFIG:
deleteAllKnownMeters(txFacade, instanceIdentifier, deviceRegistry.getDeviceMeterRegistry());
break;
}
- return Futures.transform(future, (Function<Void, Boolean>) input -> {
- if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
- txFacade.submitTransaction();
+ if (writeStatistics(type, statistics, deviceInfo, statisticsWriterProvider)) {
+ txFacade.submitTransaction();
- if (MultipartType.OFPMPFLOW.equals(type)) {
- EventsTimeCounter.markEnd(eventIdentifier);
+ switch (type) {
+ case OFPMPFLOW:
deviceRegistry.getDeviceFlowRegistry().processMarks();
- }
-
- LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
- return Boolean.TRUE;
+ break;
+ case OFPMPMETERCONFIG:
+ deviceRegistry.getDeviceMeterRegistry().processMarks();
+ break;
+ case OFPMPGROUPDESC:
+ deviceRegistry.getDeviceGroupRegistry().processMarks();
+ break;
}
- LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step", type, deviceInfo.getLOGValue());
- return Boolean.FALSE;
- });
+ LOG.debug("Stats reply added to transaction for node {} of type {}", deviceInfo.getNodeId(), type);
+ return true;
+ }
+
+ LOG.warn("Stats processing of type {} for node {} failed during write-to-tx step", type, deviceInfo.getLOGValue());
+ return false;
}
private static boolean writeStatistics(final MultipartType type,
return result.get();
}
- public static ListenableFuture<Void> deleteAllKnownFlows(final TxFacade txFacade,
- final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
- final boolean initial) {
- if (initial) {
- return Futures.immediateFuture(null);
+ public static void deleteAllKnownFlows(final TxFacade txFacade,
+ final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
+ final DeviceFlowRegistry deviceFlowRegistry) {
+ if (!txFacade.isTransactionsEnabled()) {
+ return;
}
final ReadOnlyTransaction readTx = txFacade.getReadTransaction();
- return Futures.transform(Futures
- .withFallback(readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier), t -> {
- // we wish to close readTx for fallBack
- readTx.close();
- return Futures.immediateFailedFuture(t);
- }), (Function<Optional<FlowCapableNode>, Void>)
- flowCapNodeOpt -> {
- // we have to read actual tables with all information before we set empty Flow list, merge is expensive and
- // not applicable for lists
- if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
- for (final Table tableData : flowCapNodeOpt.get().getTable()) {
- final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
- final InstanceIdentifier<Table> iiToTable = instanceIdentifier.child(Table.class, tableData.getKey());
- txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
+
+ try {
+ Futures.transform(Futures
+ .withFallback(readTx.read(LogicalDatastoreType.OPERATIONAL, instanceIdentifier), t -> {
+ // we wish to close readTx for fallBack
+ readTx.close();
+ return Futures.immediateFailedFuture(t);
+ }), (Function<Optional<FlowCapableNode>, Void>)
+ flowCapNodeOpt -> {
+ // we have to read actual tables with all information before we set empty Flow list, merge is expensive and
+ // not applicable for lists
+ if (flowCapNodeOpt != null && flowCapNodeOpt.isPresent()) {
+ for (final Table tableData : flowCapNodeOpt.get().getTable()) {
+ final Table table = new TableBuilder(tableData).setFlow(Collections.emptyList()).build();
+ final InstanceIdentifier<Table> iiToTable = instanceIdentifier.child(Table.class, tableData.getKey());
+ txFacade.writeToTransaction(LogicalDatastoreType.OPERATIONAL, iiToTable, table);
+ }
}
- }
- readTx.close();
- return null;
- });
+ readTx.close();
+ return null;
+ }).get();
+ } catch (InterruptedException | ExecutionException ex) {
+ LOG.debug("Failed to delete {} flows, exception: {}", deviceFlowRegistry.size(), ex);
+ }
}
- private static void deleteAllKnownMeters(final TxFacade txFacade,
- final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
- final DeviceMeterRegistry meterRegistry) {
+ public static void deleteAllKnownMeters(final TxFacade txFacade,
+ final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
+ final DeviceMeterRegistry meterRegistry) {
meterRegistry.forEach(meterId -> txFacade
.addDeleteToTxChain(
LogicalDatastoreType.OPERATIONAL,
instanceIdentifier.child(Meter.class, new MeterKey(meterId))));
-
- meterRegistry.processMarks();
}
- private static void deleteAllKnownGroups(final TxFacade txFacade,
- final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
- final DeviceGroupRegistry groupRegistry) {
+ public static void deleteAllKnownGroups(final TxFacade txFacade,
+ final InstanceIdentifier<FlowCapableNode> instanceIdentifier,
+ final DeviceGroupRegistry groupRegistry) {
groupRegistry.forEach(groupId -> txFacade
.addDeleteToTxChain(
LogicalDatastoreType.OPERATIONAL,
instanceIdentifier.child(Group.class, new GroupKey(groupId))));
-
- groupRegistry.processMarks();
}
/**
.thenReturn(Futures.immediateFuture(null));
Mockito.when(rpcContext.stopClusterServices()).thenReturn(Futures.immediateFuture(null));
Mockito.when(statisticsContext.stopClusterServices()).thenReturn(Futures.immediateFuture(null));
- Mockito.when(statisticsContext.initialGatherDynamicData()).thenReturn(Futures.immediateFuture(null));
+ Mockito.when(statisticsContext.gatherDynamicData()).thenReturn(Futures.immediateFuture(null));
Mockito.when(connectionContext.getDeviceInfo()).thenReturn(deviceInfo);
contextChain = new ContextChainImpl(connectionContext);
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.TableKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
final RpcResult<List<MultipartReply>> expectedRpcResult =
RpcResultBuilder.<List<MultipartReply>>failed().withError(RpcError.ErrorType.APPLICATION,
- String.format("Unexpected response type received %s.", mockedHelloMessage.getClass())).build();
+ String.format("Unexpected response type received: %s.", mockedHelloMessage.getClass())).build();
final RpcResult<List<MultipartReply>> actualResult = dummyRequestContext.getFuture().get();
assertNotNull(actualResult.getErrors());
assertEquals(1, actualResult.getErrors().size());
final RpcError actualError = actualResult.getErrors().iterator().next();
- assertEquals(actualError.getMessage(), String.format("Unexpected response type received %s.", mockedHelloMessage.getClass()));
+ assertEquals(actualError.getMessage(), String.format("Unexpected response type received: %s.", mockedHelloMessage.getClass()));
assertEquals(actualError.getErrorType(),RpcError.ErrorType.APPLICATION);
assertEquals(expectedRpcResult.getResult(), actualResult.getResult());
assertEquals(expectedRpcResult.isSuccessful(), actualResult.isSuccessful());
when(mockedDeviceContext.getReadTransaction()).thenReturn(mockedReadOnlyTx);
multipartRequestOnTheFlyCallback.onSuccess(mpReplyMessage.build());
- final InstanceIdentifier<Table> tableIdent = nodePath.child(Table.class, new TableKey(tableId));
- verify(mockedReadOnlyTx, times(1)).read(LogicalDatastoreType.OPERATIONAL, nodePath);
- verify(mockedReadOnlyTx, times(1)).close();
+ verify(mockedReadOnlyTx, times(0)).read(LogicalDatastoreType.OPERATIONAL, nodePath);
+ verify(mockedReadOnlyTx, times(0)).close();
verify(mockedDeviceContext, times(1)).writeToTransaction(eq(LogicalDatastoreType.OPERATIONAL),
- eq(tableIdent), Matchers.<Table> any());
- /*
- * One call for Table one call for Flow
- * we are not able to create Flow InstanceIdentifier because we are missing FlowId
- */
- verify(mockedDeviceContext, times(2)).writeToTransaction(eq(LogicalDatastoreType.OPERATIONAL),
- Matchers.<InstanceIdentifier> any(), Matchers.<DataObject> any());
+ Matchers.any(), Matchers.any());
}
/**
private void removeFlowFailCallback(short version) throws InterruptedException, ExecutionException {
RemoveFlowInput mockedRemoveFlowInput = new RemoveFlowInputBuilder()
+ .setTableId((short)1)
.setMatch(match)
.build();
when(deviceContext.getReadTransaction()).thenReturn(readTx);
when(deviceContext.getReadTransaction()).thenReturn(readTx);
when(deviceContext.getPrimaryConnectionContext()).thenReturn(connectionAdapter);
+ when(deviceContext.isTransactionsEnabled()).thenReturn(Boolean.TRUE);
when(connectionAdapter.getNodeId()).thenReturn(DUMMY_NODE_ID);
when(connectionAdapter.getFeatures()).thenReturn(features);
when(features.getDatapathId()).thenReturn(BigInteger.ONE);
verify(deviceContext, Mockito.never()).addDeleteToTxChain(Matchers.eq(LogicalDatastoreType.OPERATIONAL), Matchers.<InstanceIdentifier<?>>any());
final InOrder inOrder = Mockito.inOrder(deviceContext);
- inOrder.verify(deviceContext).writeToTransaction(Matchers.eq(LogicalDatastoreType.OPERATIONAL), Matchers.eq(tablePath), Matchers.any(Table.class));
+ inOrder.verify(deviceContext).writeToTransaction(Matchers.eq(LogicalDatastoreType.OPERATIONAL),Matchers.any(), Matchers.any());
}
@Test
type,
deviceContext,
deviceContext,
- false,
ConvertorManagerFactory.createDefaultManager(),
provider);
final Optional<FlowCapableNode> flowNodeOpt = Optional.of(flowNodeBuilder.build());
final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> flowNodeFuture = Futures.immediateCheckedFuture(flowNodeOpt);
when(readTx.read(LogicalDatastoreType.OPERATIONAL, nodePath)).thenReturn(flowNodeFuture);
- final KeyedInstanceIdentifier<Table, TableKey> tablePath = deviceInfo.getNodeInstanceIdentifier()
- .augmentation(FlowCapableNode.class).child(Table.class, new TableKey(tableId));
-
StatisticsGatheringUtils.deleteAllKnownFlows(deviceContext, deviceInfo.getNodeInstanceIdentifier()
- .augmentation(FlowCapableNode.class), false);
+ .augmentation(FlowCapableNode.class), deviceFlowRegistry);
- verify(deviceContext).writeToTransaction(
- LogicalDatastoreType.OPERATIONAL,
- tablePath,
- tableDataBld.setFlow(Collections.<Flow>emptyList()).build());
+ verify(deviceContext).isTransactionsEnabled();
+ verify(deviceContext).getReadTransaction();
+ verify(deviceContext).writeToTransaction(Mockito.eq(LogicalDatastoreType.OPERATIONAL), Mockito.any(), Mockito.any());
}
}