+ Futures.addCallback(future, new UpdateFlowCallback(input));
+ return future;
+ }
+
+ @VisibleForTesting
+ private static KeyedInstanceIdentifier<Flow, FlowKey> createFlowPath(FlowDescriptor flowDescriptor,
+ KeyedInstanceIdentifier<Node, NodeKey> nodePath) {
+ return nodePath.augmentation(FlowCapableNode.class)
+ .child(Table.class, flowDescriptor.getTableKey())
+ .child(Flow.class, new FlowKey(flowDescriptor.getFlowId()));
+ }
+
+ private class AddFlowCallback implements FutureCallback<RpcResult<AddFlowOutput>> {
+ private final AddFlowInput input;
+ private final FlowRegistryKey flowRegistryKey;
+
+ private AddFlowCallback(final AddFlowInput input,
+ final FlowRegistryKey flowRegistryKey) {
+ this.input = input;
+ this.flowRegistryKey = flowRegistryKey;
+ }
+
+ @Override
+ public void onSuccess(final RpcResult<AddFlowOutput> rpcResult) {
+ if (rpcResult.isSuccessful()) {
+ final FlowDescriptor flowDescriptor;
+
+ if (Objects.nonNull(input.getFlowRef())) {
+ final FlowId flowId = input.getFlowRef().getValue().firstKeyOf(Flow.class, FlowKey.class).getId();
+ flowDescriptor = FlowDescriptorFactory.create(input.getTableId(), flowId);
+ deviceContext.getDeviceFlowRegistry().storeDescriptor(flowRegistryKey, flowDescriptor);
+ } else {
+ deviceContext.getDeviceFlowRegistry().store(flowRegistryKey);
+ flowDescriptor = deviceContext.getDeviceFlowRegistry().retrieveDescriptor(flowRegistryKey);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Flow add with id={} finished without error", flowDescriptor.getFlowId().getValue());
+ }
+
+ if (itemLifecycleListener != null) {
+ KeyedInstanceIdentifier<Flow, FlowKey> flowPath = createFlowPath(flowDescriptor,
+ deviceContext.getDeviceInfo().getNodeInstanceIdentifier());
+ final FlowBuilder flowBuilder = new FlowBuilder(input).setId(flowDescriptor.getFlowId());
+ itemLifecycleListener.onAdded(flowPath, flowBuilder.build());
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Flow add failed for flow={}, errors={}", input,
+ ErrorUtil.errorsToString(rpcResult.getErrors()));
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(final Throwable throwable) {
+ LOG.warn("Service call for adding flow={} failed, reason: {}", input, throwable);
+ }
+ }
+
+ private class RemoveFlowCallback implements FutureCallback<RpcResult<RemoveFlowOutput>> {
+ private final RemoveFlowInput input;
+
+ private RemoveFlowCallback(final RemoveFlowInput input) {
+ this.input = input;
+ }