+ private CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> fillFromDatastore(final LogicalDatastoreType logicalDatastoreType, final InstanceIdentifier<FlowCapableNode> path) {
+ // Create new read-only transaction
+ final ReadOnlyTransaction transaction = dataBroker.newReadOnlyTransaction();
+
+ // Bail out early if transaction is null
+ if (transaction == null) {
+ return Futures.immediateFailedCheckedFuture(
+ new ReadFailedException("Read transaction is null"));
+ }
+
+ // Prepare read operation from datastore for path
+ final CheckedFuture<Optional<FlowCapableNode>, ReadFailedException> future =
+ transaction.read(logicalDatastoreType, path);
+
+ // Bail out early if future is null
+ if (future == null) {
+ return Futures.immediateFailedCheckedFuture(
+ new ReadFailedException("Future from read transaction is null"));
+ }
+
+ Futures.addCallback(future, new FutureCallback<Optional<FlowCapableNode>>() {
+ @Override
+ public void onSuccess(Optional<FlowCapableNode> result) {
+ result.asSet().stream()
+ .filter(Objects::nonNull)
+ .filter(flowCapableNode -> Objects.nonNull(flowCapableNode.getTable()))
+ .flatMap(flowCapableNode -> flowCapableNode.getTable().stream())
+ .filter(Objects::nonNull)
+ .filter(table -> Objects.nonNull(table.getFlow()))
+ .flatMap(table -> table.getFlow().stream())
+ .filter(Objects::nonNull)
+ .filter(flow -> Objects.nonNull(flow.getId()))
+ .forEach(flowConsumer);
+
+ // After we are done with reading from datastore, close the transaction
+ transaction.close();
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // Even when read operation failed, close the transaction
+ transaction.close();
+ }
+ });
+
+ return future;
+ }