Migrate users of Optional.get()
[openflowplugin.git] / applications / forwardingrules-manager / src / main / java / org / opendaylight / openflowplugin / applications / frm / impl / BundleFlowForwarder.java
index 1c3ded8b7bc051b2c6517f4189ee5a7aaf213473..37ec48bf9dfd7ea485bb8730e81ad938af31ed59 100644 (file)
@@ -7,14 +7,14 @@
  */
 package org.opendaylight.openflowplugin.applications.frm.impl;
 
+import static java.util.Objects.requireNonNull;
 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.buildGroupInstanceIdentifier;
 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getFlowId;
-import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdFromNodeIdentifier;
+import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getNodeIdValueFromNodeIdentifier;
 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getTableId;
 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isFlowDependentOnGroup;
 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.isGroupExistsOnDevice;
 
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -34,12 +34,9 @@ import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
 import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeId;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.NodeRef;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.AddBundleMessagesInput;
@@ -58,46 +55,65 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.on
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleFlags;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.rev170124.BundleId;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.common.ErrorType;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
+import org.opendaylight.yangtools.yang.common.Uint32;
+import org.opendaylight.yangtools.yang.common.Uint8;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class BundleFlowForwarder implements BundleMessagesCommiter<Flow> {
-
     private static final Logger LOG = LoggerFactory.getLogger(BundleFlowForwarder.class);
     private static final BundleFlags BUNDLE_FLAGS = new BundleFlags(true, true);
+
     private final ForwardingRulesManager forwardingRulesManager;
     private final NodeConfigurator nodeConfigurator;
 
-    public BundleFlowForwarder(ForwardingRulesManager forwardingRulesManager) {
-        this.forwardingRulesManager = Preconditions.checkNotNull(forwardingRulesManager,
-                "ForwardingRulesManager can not be null!");
-        this.nodeConfigurator = Preconditions.checkNotNull(forwardingRulesManager.getNodeConfigurator(),
+    public BundleFlowForwarder(final ForwardingRulesManager forwardingRulesManager) {
+        this.forwardingRulesManager = requireNonNull(forwardingRulesManager, "ForwardingRulesManager can not be null!");
+        nodeConfigurator = requireNonNull(forwardingRulesManager.getNodeConfigurator(),
                 "NodeConfigurator can not be null!");
     }
 
-    public void remove(final InstanceIdentifier<Flow> identifier, final Flow flow,
-            final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
-        final List<Message> messages = new ArrayList<>(1);
-        String node = nodeIdent.firstKeyOf(Node.class).getId().getValue();
-        BundleInnerMessage bundleInnerMessage = new BundleRemoveFlowCaseBuilder()
+    @Override
+    public void remove(final InstanceIdentifier<Flow> identifier,
+                       final Flow flow,
+                       final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                       final BundleId bundleId) {
+        final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
+        nodeConfigurator.enqueueJob(nodeId, () -> {
+            final List<Message> messages = new ArrayList<>(1);
+            String node = nodeIdent.firstKeyOf(Node.class).getId().getValue();
+            BundleInnerMessage bundleInnerMessage = new BundleRemoveFlowCaseBuilder()
                 .setRemoveFlowCaseData(new RemoveFlowCaseDataBuilder(flow).build()).build();
-        Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
-                .setBundleInnerMessage(bundleInnerMessage).build();
-        messages.add(message);
-        AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
-                .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
-                .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
-        final ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = forwardingRulesManager
+            Message message = new MessageBuilder()
+                .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                .setBundleInnerMessage(bundleInnerMessage)
+                .build();
+            messages.add(message);
+            AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
+                .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                .setBundleId(bundleId)
+                .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder()
+                        .setMessage(messages)
+                        .build())
+                .build();
+            final ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = forwardingRulesManager
                 .getSalBundleService().addBundleMessages(addBundleMessagesInput);
-        LOG.trace("Pushing flow remove message {} to bundle {} for device {}", addBundleMessagesInput,
+            LOG.trace("Pushing flow remove message {} to bundle {} for device {}", addBundleMessagesInput,
                 bundleId.getValue(), node);
-        LoggingFutures.addErrorLogging(resultFuture, LOG, "removeBundleFlow");
+            LoggingFutures.addErrorLogging(resultFuture, LOG, "removeBundleFlow");
+            return resultFuture;
+        });
     }
 
-    public void update(final InstanceIdentifier<Flow> identifier, final Flow originalFlow, final Flow updatedFlow,
-            final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
+    @Override
+    public void update(final InstanceIdentifier<Flow> identifier,
+                       final Flow originalFlow,
+                       final Flow updatedFlow,
+                       final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                       final BundleId bundleId) {
         remove(identifier, originalFlow, nodeIdent, bundleId);
         add(identifier, updatedFlow, nodeIdent, bundleId);
     }
@@ -105,83 +121,100 @@ public class BundleFlowForwarder implements BundleMessagesCommiter<Flow> {
     @Override
     public ListenableFuture<RpcResult<AddBundleMessagesOutput>> add(final InstanceIdentifier<Flow> identifier,
                                                                     final Flow flow,
-            final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId) {
-        final NodeId nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
-        return nodeConfigurator.enqueueJob(nodeId.getValue(), () -> {
+                                                                    final InstanceIdentifier<FlowCapableNode> nodeIdent,
+                                                                    final BundleId bundleId) {
+        final String nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
+        return nodeConfigurator.enqueueJob(nodeId, () -> {
             BundleInnerMessage bundleInnerMessage = new BundleAddFlowCaseBuilder()
-                    .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow).build()).build();
+                    .setAddFlowCaseData(new AddFlowCaseDataBuilder(flow)
+                            .build())
+                    .build();
             Message message = new MessageBuilder().setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
-                    .setBundleInnerMessage(bundleInnerMessage).build();
+                    .setBundleInnerMessage(bundleInnerMessage)
+                    .build();
             ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent, flow,
                     identifier, bundleId);
             SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
-            Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, identifier,
-                    resultFuture), MoreExecutors.directExecutor());
+            Futures.addCallback(groupFuture,
+                    new BundleFlowCallBack(nodeIdent, bundleId, message, identifier, resultFuture),
+                    MoreExecutors.directExecutor());
             return resultFuture;
         });
     }
 
     private ListenableFuture<RpcResult<AddBundleMessagesOutput>> pushDependentGroup(
-            final InstanceIdentifier<FlowCapableNode> nodeIdent, Flow updatedFlow, InstanceIdentifier<Flow> identifier,
-            BundleId bundleId) {
+            final InstanceIdentifier<FlowCapableNode> nodeIdent,
+            final Flow updatedFlow,
+            final InstanceIdentifier<Flow> identifier,
+            final BundleId bundleId) {
         //TODO This read to the DS might have a performance impact.
         //if the dependent group is not installed than we should just cache the parent group,
         //till we receive the dependent group DTCN and then push it.
-        Long groupId = isFlowDependentOnGroup(updatedFlow);
+        Uint32 groupId = isFlowDependentOnGroup(updatedFlow);
         ListenableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
         if (groupId != null) {
             LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
-                    getFlowId(new FlowRef(identifier)), groupId);
+                    getFlowId(identifier), groupId);
             if (isGroupExistsOnDevice(nodeIdent, groupId, forwardingRulesManager)) {
                 LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
-                        getFlowId(new FlowRef(identifier)));
+                        getFlowId(identifier));
                 resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
             } else {
                 LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
                 InstanceIdentifier<Group> groupIdent = buildGroupInstanceIdentifier(nodeIdent, groupId);
                 LOG.info("Reading the group from config inventory: {}", groupId);
                 try (ReadTransaction readTransaction = forwardingRulesManager.getReadTransaction()) {
-                    Optional<Group> group = readTransaction.read(LogicalDatastoreType.CONFIGURATION, groupIdent).get();
-                    if (group.isPresent()) {
-                        final AddGroupInputBuilder builder = new AddGroupInputBuilder(group.get());
+                    Optional<Group> optGroup = readTransaction.read(LogicalDatastoreType.CONFIGURATION, groupIdent)
+                        .get();
+                    if (optGroup.isPresent()) {
+                        final Group group = optGroup.orElseThrow();
+                        final AddGroupInputBuilder builder = new AddGroupInputBuilder(group);
                         builder.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)));
                         builder.setGroupRef(new GroupRef(nodeIdent));
                         builder.setTransactionUri(new Uri(forwardingRulesManager.getNewTransactionId()));
                         BundleInnerMessage bundleInnerMessage = new BundleAddGroupCaseBuilder()
-                                .setAddGroupCaseData(new AddGroupCaseDataBuilder(group.get()).build()).build();
-                        Message groupMessage = new MessageBuilder().setNode(
-                                new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
-                                .setBundleInnerMessage(bundleInnerMessage).build();
+                                .setAddGroupCaseData(new AddGroupCaseDataBuilder(group).build()).build();
+                        Message groupMessage = new MessageBuilder()
+                                .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                                .setBundleInnerMessage(bundleInnerMessage)
+                                .build();
                         final List<Message> messages = new ArrayList<>(1);
                         messages.add(groupMessage);
                         AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
-                                .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
-                                .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build())
+                                .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                                .setBundleId(bundleId)
+                                .setFlags(BUNDLE_FLAGS)
+                                .setMessages(new MessagesBuilder()
+                                        .setMessage(messages).build())
                                 .build();
                         LOG.trace("Pushing flow update message {} to bundle {} for device {}", addBundleMessagesInput,
-                                bundleId.getValue(), getNodeIdFromNodeIdentifier(nodeIdent));
+                                bundleId.getValue(), getNodeIdValueFromNodeIdentifier(nodeIdent));
                         resultFuture = forwardingRulesManager
-                                .getSalBundleService().addBundleMessages(addBundleMessagesInput);
+                                .getSalBundleService()
+                                .addBundleMessages(addBundleMessagesInput);
                         Futures.transformAsync(resultFuture, rpcResult -> {
                             if (rpcResult.isSuccessful()) {
                                 forwardingRulesManager.getDevicesGroupRegistry()
-                                        .storeGroup(getNodeIdFromNodeIdentifier(nodeIdent), groupId);
+                                        .storeGroup(getNodeIdValueFromNodeIdentifier(nodeIdent), groupId);
                                 LOG.trace("Group {} stored in cache", groupId);
                             }
                             return Futures.immediateFuture(null);
                         }, MoreExecutors.directExecutor());
                     } else {
                         LOG.debug("Group {} not present in the config inventory", groupId);
-                        resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success()
-                                .build());
+                        resultFuture = RpcResultBuilder.<AddBundleMessagesOutput>failed()
+                                .withError(ErrorType.APPLICATION,
+                                        "Group " + groupId + " not present in the config inventory").buildFuture();
                     }
                 } catch (InterruptedException | ExecutionException e) {
                     LOG.error("Error while reading group from config datastore for the group ID {}", groupId, e);
-                    resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
+                    resultFuture = RpcResultBuilder.<AddBundleMessagesOutput>failed()
+                            .withError(ErrorType.APPLICATION,
+                                    "Group " + groupId + " not present in the config inventory").buildFuture();
                 }
             }
         } else {
-            resultFuture = Futures.immediateFuture(RpcResultBuilder.<AddBundleMessagesOutput>success().build());
+            resultFuture = RpcResultBuilder.<AddBundleMessagesOutput>success().buildFuture();
         }
         return resultFuture;
     }
@@ -190,59 +223,66 @@ public class BundleFlowForwarder implements BundleMessagesCommiter<Flow> {
         private final InstanceIdentifier<FlowCapableNode> nodeIdent;
         private final BundleId bundleId;
         private final Message messages;
-        private final NodeId nodeId;
+        private final String nodeId;
         private final String flowId;
-        private final short tableId;
+        private final Uint8 tableId;
         private final SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
 
-        BundleFlowCallBack(InstanceIdentifier<FlowCapableNode> nodeIdent, BundleId bundleId, Message messages,
-            InstanceIdentifier<Flow> identifier , SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture) {
+        BundleFlowCallBack(final InstanceIdentifier<FlowCapableNode> nodeIdent, final BundleId bundleId,
+                final Message messages, final InstanceIdentifier<Flow> identifier,
+                final SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture) {
             this.nodeIdent = nodeIdent;
             this.bundleId = bundleId;
             this.messages = messages;
             this.resultFuture = resultFuture;
-            this.flowId = getFlowId(new FlowRef(identifier));
-            this.tableId = getTableId(new FlowTableRef(identifier));
-            nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
+            flowId = getFlowId(identifier);
+            tableId = getTableId(identifier);
+            nodeId = getNodeIdValueFromNodeIdentifier(nodeIdent);
         }
 
         @Override
-        public void onSuccess(RpcResult<AddBundleMessagesOutput> rpcResult) {
+        public void onSuccess(final RpcResult<AddBundleMessagesOutput> rpcResult) {
             if (rpcResult.isSuccessful()) {
                 AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
-                        .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
-                        .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(
-                                Collections.singletonList(messages)).build()).build();
+                        .setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class)))
+                        .setBundleId(bundleId)
+                        .setFlags(BUNDLE_FLAGS)
+                        .setMessages(new MessagesBuilder()
+                                .setMessage(
+                                        Collections.singletonList(messages)).build())
+                        .build();
 
                 LOG.trace("Pushing flow add message {} to bundle {} for device {}", addBundleMessagesInput,
-                        bundleId.getValue(), nodeId.getValue());
+                        bundleId.getValue(), nodeId);
 
                 final ListenableFuture<RpcResult<AddBundleMessagesOutput>> addFuture =
                         forwardingRulesManager.getSalBundleService().addBundleMessages(addBundleMessagesInput);
                 Futures.addCallback(addFuture, new FutureCallback<RpcResult<AddBundleMessagesOutput>>() {
                     @Override
-                    public void onSuccess(RpcResult<AddBundleMessagesOutput> result) {
+                    public void onSuccess(final RpcResult<AddBundleMessagesOutput> result) {
                         resultFuture.set(result);
                         if (!result.getErrors().isEmpty()) {
                             LOG.error("Flow add with flowId {} and tableId {} failed for node {} with error: {}",
-                                    flowId, tableId, nodeId.getValue(), result.getErrors().toString());
+                                    flowId, tableId, nodeId, result.getErrors().toString());
                         }
 
                     }
 
                     @Override
-                    public void onFailure(Throwable failure) {
+                    public void onFailure(final Throwable failure) {
                         resultFuture.setException(failure);
                     }
                 },  MoreExecutors.directExecutor());
             } else {
+                LOG.error("Error {} while pushing flow add bundle {} for device {}", rpcResult.getErrors(), messages,
+                        nodeId);
                 resultFuture.set(rpcResult);
             }
         }
 
         @Override
-        public void onFailure(Throwable throwable) {
-            LOG.error("Error while pushing flow add bundle {} for device {}", messages, nodeId.getValue());
+        public void onFailure(final Throwable throwable) {
+            LOG.error("Error while pushing flow add bundle {} for device {}", messages, nodeId);
             resultFuture.setException(throwable);
         }
     }