*/
public interface DeviceFlowRegistry extends AutoCloseable {
- ListenableFuture<List<Optional<FlowCapableNode>>> fill(KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier);
+ ListenableFuture<List<Optional<FlowCapableNode>>> fill();
FlowDescriptor retrieveIdForFlow(FlowRegistryKey flowRegistryKey);
import org.opendaylight.openflowplugin.api.openflow.lifecycle.LifecycleConductor;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.RoleChangeListener;
import org.opendaylight.openflowplugin.api.openflow.lifecycle.ServiceChangeListener;
+import org.opendaylight.openflowplugin.api.openflow.registry.flow.DeviceFlowRegistry;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcContext;
import org.opendaylight.openflowplugin.api.openflow.rpc.RpcManager;
import org.opendaylight.openflowplugin.api.openflow.statistics.StatisticsContext;
if (OfpRole.BECOMEMASTER.equals(newRole)) {
logText = "Start";
-
- // Fill device flow registry with flows from datastore
- final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill =
- deviceContext.getDeviceFlowRegistry().fill(deviceInfo.getNodeInstanceIdentifier());
-
- // Start statistics scheduling only after we finished initializing device flow registry
- Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
- @Override
- public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
- if (LOG.isDebugEnabled()) {
- // Count all flows we read from datastore for debugging purposes.
- // This number do not always represent how many flows were actually added
- // to DeviceFlowRegistry, because of possible duplicates.
- long flowCount = Optional.fromNullable(result).asSet().stream()
- .flatMap(Collection::stream)
- .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
- .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
- .flatMap(table -> table.getFlow().stream())
- .count();
-
- LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId());
- }
-
- statisticsManager.startScheduling(deviceInfo);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // If we manually cancelled this future, do not start scheduling of statistics
- if (deviceFlowRegistryFill.isCancelled()) {
- LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId());
- } else {
- LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId(), t);
- statisticsManager.startScheduling(deviceInfo);
- }
- }
- });
-
+ fillDeviceFlowRegistry(deviceInfo, deviceContext.getDeviceFlowRegistry());
MdSalRegistrationUtils.registerServices(rpcContext, deviceContext, this.extensionConverterProvider);
if (rpcContext.isStatisticsRpcEnabled()) {
deviceContext,
notificationPublishService);
}
-
- // Fill flow registry with flows found in operational and config datastore
- deviceContext.getDeviceFlowRegistry().fill(deviceInfo.getNodeInstanceIdentifier());
} else {
logText = "Stopp";
statisticsManager.stopScheduling(deviceInfo);
});
}
+ private void fillDeviceFlowRegistry(final DeviceInfo deviceInfo, final DeviceFlowRegistry deviceFlowRegistry) {
+ // Fill device flow registry with flows from datastore
+ final ListenableFuture<List<Optional<FlowCapableNode>>> deviceFlowRegistryFill = deviceFlowRegistry.fill();
+
+ // Start statistics scheduling only after we finished initializing device flow registry
+ Futures.addCallback(deviceFlowRegistryFill, new FutureCallback<List<Optional<FlowCapableNode>>>() {
+ @Override
+ public void onSuccess(@Nullable List<Optional<FlowCapableNode>> result) {
+ if (LOG.isDebugEnabled()) {
+ // Count all flows we read from datastore for debugging purposes.
+ // This number do not always represent how many flows were actually added
+ // to DeviceFlowRegistry, because of possible duplicates.
+ long flowCount = Optional.fromNullable(result).asSet().stream()
+ .flatMap(Collection::stream)
+ .flatMap(flowCapableNodeOptional -> flowCapableNodeOptional.asSet().stream())
+ .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+ .flatMap(table -> table.getFlow().stream())
+ .count();
+
+ LOG.debug("Finished filling flow registry with {} flows for node: {}", flowCount, deviceInfo.getNodeId());
+ }
+
+ statisticsManager.startScheduling(deviceInfo);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // If we manually cancelled this future, do not start scheduling of statistics
+ if (deviceFlowRegistryFill.isCancelled()) {
+ LOG.debug("Cancelled filling flow registry with flows for node: {}", deviceInfo.getNodeId());
+ } else {
+ LOG.warn("Failed filling flow registry with flows for node: {} with exception: {}", deviceInfo.getNodeId(), t);
+ statisticsManager.startScheduling(deviceInfo);
+ }
+ }
+ });
+ }
+
public MessageIntelligenceAgency getMessageIntelligenceAgency() {
return messageIntelligenceAgency;
}
deviceInfo = primaryConnectionContext.getDeviceInfo();
this.transactionChainManager = new TransactionChainManager(dataBroker, deviceInfo, conductor);
auxiliaryConnectionContexts = new HashMap<>();
- deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker);
+ deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, deviceInfo.getNodeInstanceIdentifier());
deviceGroupRegistry = new DeviceGroupRegistryImpl();
deviceMeterRegistry = new DeviceMeterRegistryImpl();
messageSpy = conductor.getMessageIntelligenceAgency();
@GuardedBy("marks")
private final Collection<FlowRegistryKey> marks = new HashSet<>();
private final DataBroker dataBroker;
+ private final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier;
private final List<ListenableFuture<List<Optional<FlowCapableNode>>>> lastFillFutures = new ArrayList<>();
// Specifies what to do with flow read from datastore
};
- public DeviceFlowRegistryImpl(final DataBroker dataBroker) {
+ public DeviceFlowRegistryImpl(final DataBroker dataBroker, final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier) {
this.dataBroker = dataBroker;
+ this.instanceIdentifier = instanceIdentifier;
}
@Override
- public ListenableFuture<List<Optional<FlowCapableNode>>> fill(final KeyedInstanceIdentifier<Node, NodeKey> instanceIdentifier) {
+ public ListenableFuture<List<Optional<FlowCapableNode>>> fill() {
LOG.debug("Filling flow registry with flows for node: {}", instanceIdentifier);
// Prepare path for read transaction
private NodeId nodeId = new NodeId("openflow-junit:1");
private OfpRole ofpRole = OfpRole.NOCHANGE;
+ private KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
@Before
public void setUp() {
- final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
+ nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(nodeId));
lifecycleConductor = new LifecycleConductorImpl(messageIntelligenceAgency);
lifecycleConductor.setSafelyManager(deviceManager);
final DataBroker dataBroker = mock(DataBroker.class);
when(deviceContext.getDeviceState()).thenReturn(deviceState);
- when(deviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker));
+ when(deviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker, nodeInstanceIdentifier));
when(deviceManager.gainContext(deviceInfo)).thenReturn(deviceContext);
when(deviceManager.onClusterRoleChange(deviceInfo, OfpRole.BECOMEMASTER)).thenReturn(listenableFuture);
lifecycleConductor.roleChangeOnDevice(deviceInfo,OfpRole.BECOMEMASTER);
final DataBroker dataBroker = mock(DataBroker.class);
when(deviceContext.getDeviceState()).thenReturn(deviceState);
- when(deviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker));
+ when(deviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker, nodeInstanceIdentifier));
when(deviceManager.gainContext(deviceInfo)).thenReturn(deviceContext);
when(deviceManager.onClusterRoleChange(deviceInfo, OfpRole.BECOMESLAVE)).thenReturn(listenableFuture);
private DeviceFlowRegistryImpl deviceFlowRegistry;
private FlowRegistryKey key;
private FlowDescriptor descriptor;
+ private KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier;
@Mock
private DataBroker dataBroker;
@Mock
@Before
public void setUp() throws Exception {
+ nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(new NodeId(NODE_ID)));
when(dataBroker.newReadOnlyTransaction()).thenReturn(readOnlyTransaction);
when(readOnlyTransaction.read(any(), any())).thenReturn(Futures.immediateCheckedFuture(Optional.absent()));
- deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker);
+ deviceFlowRegistry = new DeviceFlowRegistryImpl(dataBroker, nodeInstanceIdentifier);
final FlowAndStatisticsMapList flowStats = TestFlowHelper.createFlowAndStatisticsMapListBuilder(1).build();
key = FlowRegistryKeyFactory.create(flowStats);
descriptor = FlowDescriptorFactory.create(key.getTableId(), new FlowId("ut:1"));
@Test
public void testFill() throws Exception {
- final KeyedInstanceIdentifier<Node, NodeKey> nodeInstanceIdentifier = InstanceIdentifier.create(Nodes.class).child(Node.class, new NodeKey(new NodeId(NODE_ID)));
final InstanceIdentifier<FlowCapableNode> path = nodeInstanceIdentifier.augmentation(FlowCapableNode.class);
- deviceFlowRegistry.fill(nodeInstanceIdentifier).get();
+ deviceFlowRegistry.fill().get();
verify(dataBroker, times(2)).newReadOnlyTransaction();
verify(readOnlyTransaction).read(LogicalDatastoreType.CONFIGURATION, path);
when(mockedDeviceContext.getPrimaryConnectionContext()).thenReturn(mockedPrimConnectionContext);
when(mockedDeviceContext.getMessageSpy()).thenReturn(mockedMessagSpy);
- when(mockedDeviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker));
+ when(mockedDeviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker, NODE_II));
when(mockedDeviceContext.getDeviceState()).thenReturn(mockedDeviceState);
when(mockedDeviceContext.getDeviceInfo()).thenReturn(mockedDeviceInfo);
when(mockedDeviceContext.getMultiMsgCollector(Matchers.any())).thenReturn(multiMessageCollector);
when(mockedDeviceContext.getDeviceInfo()).thenReturn(mockedDeviceInfo);
when(mockedDeviceContext.getPrimaryConnectionContext()).thenReturn(mockedPrimConnectionContext);
when(mockedDeviceContext.getMessageSpy()).thenReturn(mockedMessagSpy);
- when(mockedDeviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker));
+ when(mockedDeviceContext.getDeviceFlowRegistry()).thenReturn(new DeviceFlowRegistryImpl(dataBroker, nodePath));
when(mockedDeviceContext.getDeviceState()).thenReturn(mockedDeviceState);
when(mockedDeviceContext.getMultiMsgCollector(
Matchers.<RequestContext<List<MultipartReply>>>any())).thenAnswer(