import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import io.netty.util.HashedWheelTimer;
import java.util.Collection;
import java.util.Collections;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ContextChainState;
import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
import org.opendaylight.openflowplugin.api.openflow.md.util.OpenflowVersion;
-import org.opendaylight.openflowplugin.api.openflow.registry.ItemLifeCycleRegistry;
import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
-import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowDescriptor;
-import org.opendaylight.openflowplugin.api.openflow.registry.flow.FlowRegistryKey;
import org.opendaylight.openflowplugin.api.openflow.registry.group.DeviceGroupRegistry;
import org.opendaylight.openflowplugin.api.openflow.registry.meter.DeviceMeterRegistry;
-import org.opendaylight.openflowplugin.api.openflow.rpc.ItemLifeCycleKeeper;
-import org.opendaylight.openflowplugin.api.openflow.rpc.listener.ItemLifecycleListener;
import org.opendaylight.openflowplugin.api.openflow.statistics.ofpspecific.MessageSpy;
+import org.opendaylight.openflowplugin.common.txchain.TransactionChainManager;
import org.opendaylight.openflowplugin.extension.api.ConvertorMessageFromOFJava;
import org.opendaylight.openflowplugin.extension.api.ExtensionConverterProviderKeeper;
import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionConverterProvider;
import org.opendaylight.openflowplugin.extension.api.exception.ConversionException;
import org.opendaylight.openflowplugin.extension.api.path.MessagePath;
-import org.opendaylight.openflowplugin.impl.common.ItemLifeCycleSourceImpl;
import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProvider;
import org.opendaylight.openflowplugin.impl.datastore.MultipartWriterProviderFactory;
import org.opendaylight.openflowplugin.impl.device.initialization.AbstractDeviceInitializer;
import org.opendaylight.openflowplugin.impl.device.initialization.DeviceInitializerProvider;
import org.opendaylight.openflowplugin.impl.device.listener.MultiMsgCollectorImpl;
import org.opendaylight.openflowplugin.impl.registry.flow.DeviceFlowRegistryImpl;
-import org.opendaylight.openflowplugin.impl.registry.flow.FlowRegistryKeyFactory;
import org.opendaylight.openflowplugin.impl.registry.group.DeviceGroupRegistryImpl;
import org.opendaylight.openflowplugin.impl.registry.meter.DeviceMeterRegistryImpl;
import org.opendaylight.openflowplugin.impl.rpc.AbstractRequestContext;
import org.opendaylight.yang.gen.v1.urn.opendaylight.experimenter.message.service.rev151020.ExperimenterMessageFromDevBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNodeConnector;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.Table;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.FlowKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeConnectorRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.node.NodeConnector;
private final DataBroker dataBroker;
private final Collection<RequestContext<?>> requestContexts = new HashSet<>();
private final MessageSpy messageSpy;
- private final ItemLifeCycleKeeper flowLifeCycleKeeper;
private final MessageTranslator<PortGrouping, FlowCapableNodeConnector> portStatusTranslator;
private final MessageTranslator<PacketInMessage, PacketReceived> packetInTranslator;
private final MessageTranslator<FlowRemoved, org.opendaylight.yang.gen.v1.urn.opendaylight
.flow.service.rev130819.FlowRemoved> flowRemovedTranslator;
private final TranslatorLibrary translatorLibrary;
- private final ItemLifeCycleRegistry itemLifeCycleSourceRegistry;
private final ConvertorExecutor convertorExecutor;
private final DeviceInitializerProvider deviceInitializerProvider;
private final PacketInRateLimiter packetInLimiter;
this.flowRemovedTranslator = translatorLibrary.lookupTranslator(
new TranslatorKey(deviceInfo.getVersion(), FlowRemoved.class.getName()));
- this.itemLifeCycleSourceRegistry = new ItemLifeCycleRegistryImpl();
- this.flowLifeCycleKeeper = new ItemLifeCycleSourceImpl();
- this.itemLifeCycleSourceRegistry.registerLifeCycleSource(flowLifeCycleKeeper);
this.convertorExecutor = convertorExecutor;
this.skipTableFeatures = skipTableFeatures;
this.useSingleLayerSerialization = useSingleLayerSerialization;
public <T extends DataObject> void addDeleteToTxChain(final LogicalDatastoreType store,
final InstanceIdentifier<T> path) {
if (initialized.get()) {
- transactionChainManager.addDeleteOperationTotTxChain(store, path);
+ transactionChainManager.addDeleteOperationToTxChain(store, path);
}
}
@Override
public boolean submitTransaction() {
- return initialized.get() && transactionChainManager.submitWriteTransaction();
+ return initialized.get() && transactionChainManager.submitTransaction();
}
@Override
public void processReply(final OfHeader ofHeader) {
messageSpy.spyMessage(
ofHeader.getImplementedInterface(),
- (ofHeader instanceof Error)
+ ofHeader instanceof Error
? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
: MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS);
}
public void processReply(final Xid xid, final List<? extends OfHeader> ofHeaderList) {
ofHeaderList.forEach(header -> messageSpy.spyMessage(
header.getImplementedInterface(),
- (header instanceof Error)
+ header instanceof Error
? MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_FAILURE
: MessageSpy.StatisticsGroup.FROM_SWITCH_PUBLISHED_SUCCESS));
}
// Trigger off a notification
notificationPublishService.offerNotification(flowRemovedNotification);
}
-
- final ItemLifecycleListener itemLifecycleListener = flowLifeCycleKeeper.getItemLifecycleListener();
- if (itemLifecycleListener != null) {
- //2. create registry key
- final FlowRegistryKey flowRegKey = FlowRegistryKeyFactory.create(getDeviceInfo().getVersion(),
- flowRemovedNotification);
- //3. lookup flowId
- final FlowDescriptor flowDescriptor = deviceFlowRegistry.retrieveDescriptor(flowRegKey);
- //4. if flowId present:
- if (flowDescriptor != null) {
- // a) construct flow path
- final KeyedInstanceIdentifier<Flow, FlowKey> flowPath = getDeviceInfo().getNodeInstanceIdentifier()
- .augmentation(FlowCapableNode.class)
- .child(Table.class, flowDescriptor.getTableKey())
- .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
- // b) notify listener
- itemLifecycleListener.onRemoved(flowPath);
- } else {
- LOG.debug("flow id not found: nodeId={} tableId={}, priority={}",
- getDeviceInfo().getNodeId(), flowRegKey.getTableId(), flowRemovedNotification.getPriority());
- }
- }
}
@Override
submitTransaction();
} catch (final Exception e) {
LOG.warn("Error processing port status message for port {} on device {}",
- portStatus.getPortNo(), getDeviceInfo().getLOGValue(), e);
+ portStatus.getPortNo(), getDeviceInfo(), e);
}
} else if (!hasState.get()) {
primaryConnectionContext.handlePortStatusMessage(portStatus);
LOG.trace("notification offer failed..", throwable);
packetInLimiter.releasePermit();
}
- });
+ }, MoreExecutors.directExecutor());
}
@Override
(int) (HIGH_WATERMARK_FACTOR * upperBound));
}
- @Override
- public ItemLifeCycleRegistry getItemLifeCycleSourceRegistry() {
- return itemLifeCycleSourceRegistry;
- }
-
@Override
public void setExtensionConverterProvider(final ExtensionConverterProvider extensionConverterProvider) {
this.extensionConverterProvider = extensionConverterProvider;
}
@Override
- public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher contextChainMastershipWatcher) {
- this.contextChainMastershipWatcher = contextChainMastershipWatcher;
+ public void registerMastershipWatcher(@Nonnull final ContextChainMastershipWatcher newWatcher) {
+ this.contextChainMastershipWatcher = newWatcher;
}
@Nonnull
transactionChainManager.close();
transactionChainManager = null;
}
- });
+ }, MoreExecutors.directExecutor());
}
requestContexts.forEach(requestContext -> RequestContextUtil
return useSingleLayerSerialization && getDeviceInfo().getVersion() >= OFConstants.OFP_VERSION_1_3;
}
+ // TODO: exception handling should be fixed by using custom checked exception, never RuntimeExceptions
@Override
- @SuppressWarnings("checkstyle:IllegalCatch")
+ @SuppressWarnings({"checkstyle:IllegalCatch"})
public void instantiateServiceInstance() {
lazyTransactionManagerInitialization();
submitTransaction();
} catch (final Exception ex) {
throw new RuntimeException(String.format("Error processing port status messages from device %s: %s",
- deviceInfo.toString(),
- ex.toString()));
+ deviceInfo.toString(), ex.toString()), ex);
}
final Optional<AbstractDeviceInitializer> initializer = deviceInitializerProvider
} catch (TimeoutException ex) {
initialize.cancel(true);
throw new RuntimeException(String.format("Failed to initialize device %s in %ss: %s",
- deviceInfo.toString(),
- String.valueOf(DEVICE_INIT_TIMEOUT / 1000),
- ex.toString()));
+ deviceInfo.toString(), String.valueOf(DEVICE_INIT_TIMEOUT / 1000), ex.toString()), ex);
} catch (ExecutionException | InterruptedException ex) {
- throw new RuntimeException(String.format("Device %s cannot be initialized: %s",
- deviceInfo.toString(),
- ex.toString()));
+ throw new RuntimeException(
+ String.format("Device %s cannot be initialized: %s", deviceInfo.toString(), ex.toString()), ex);
}
} else {
throw new RuntimeException(String.format("Unsupported version %s for device %s",
final ListenableFuture<List<com.google.common.base.Optional<FlowCapableNode>>> deviceFlowRegistryFill =
getDeviceFlowRegistry().fill();
Futures.addCallback(deviceFlowRegistryFill,
- new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher));
+ new DeviceFlowRegistryCallback(deviceFlowRegistryFill, contextChainMastershipWatcher),
+ MoreExecutors.directExecutor());
}
@VisibleForTesting
void lazyTransactionManagerInitialization() {
if (!this.initialized.get()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Transaction chain manager for node {} created", deviceInfo.getLOGValue());
+ LOG.debug("Transaction chain manager for node {} created", deviceInfo);
}
- this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo);
- this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker, deviceInfo
- .getNodeInstanceIdentifier());
+ this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo.getNodeId().getValue());
+ this.deviceFlowRegistry = new DeviceFlowRegistryImpl(deviceInfo.getVersion(), dataBroker,
+ deviceInfo.getNodeInstanceIdentifier());
this.deviceGroupRegistry = new DeviceGroupRegistryImpl();
this.deviceMeterRegistry = new DeviceMeterRegistryImpl();
}
.filter(Objects::nonNull)
.count();
- LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo
- .getLOGValue());
+ LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo);
}
this.contextChainMastershipWatcher.onMasterRoleAcquired(deviceInfo, ContextChainMastershipState
.INITIAL_FLOW_REGISTRY_FILL);
public void onFailure(Throwable throwable) {
if (deviceFlowRegistryFill.isCancelled()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getLOGValue());
+ LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo);
}
} else {
- LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo
- .getLOGValue(), throwable);
+ LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo,
+ throwable);
}
contextChainMastershipWatcher.onNotAbleToStartMastership(
deviceInfo,