import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.opendaylight.yang.gen.v1.urn.ietf.params.xml.ns.yang.ietf.inet.types.rev130715.Uri;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.types.rev131026.FlowRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.AddGroupInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.GroupRef;
.setBundleInnerMessage(bundleInnerMessage).build();
ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent,
updatedFlow, identifier, bundleId);
- ListenableFuture<RpcResult<AddFlowOutput>> flowFuture = SettableFuture.create();
- Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, flowFuture));
- return flowFuture;
+ SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
+ Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, resultFuture),
+ MoreExecutors.directExecutor());
+ return resultFuture;
});
}
.setBundleInnerMessage(bundleInnerMessage).build();
ListenableFuture<RpcResult<AddBundleMessagesOutput>> groupFuture = pushDependentGroup(nodeIdent, flow,
identifier, bundleId);
- ListenableFuture<RpcResult<AddFlowOutput>> flowFuture = SettableFuture.create();
- Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, flowFuture),
+ SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture = SettableFuture.create();
+ Futures.addCallback(groupFuture, new BundleFlowCallBack(nodeIdent, bundleId, message, resultFuture),
MoreExecutors.directExecutor());
- return flowFuture;
+ return resultFuture;
});
}
private final BundleId bundleId;
private final Message message;
private final NodeId nodeId;
+ private final SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture;
BundleFlowCallBack(InstanceIdentifier<FlowCapableNode> nodeIdent, BundleId bundleId, Message message,
- ListenableFuture<RpcResult<AddFlowOutput>> flowFuture) {
+ SettableFuture<RpcResult<AddBundleMessagesOutput>> resultFuture) {
this.nodeIdent = nodeIdent;
this.bundleId = bundleId;
this.message = message;
+ this.resultFuture = resultFuture;
nodeId = getNodeIdFromNodeIdentifier(nodeIdent);
}
@Override
- @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED")
public void onSuccess(RpcResult<AddBundleMessagesOutput> rpcResult) {
if (rpcResult.isSuccessful()) {
- List<Message> messages = new ArrayList<>(1);
- messages.add(message);
AddBundleMessagesInput addBundleMessagesInput = new AddBundleMessagesInputBuilder()
.setNode(new NodeRef(nodeIdent.firstIdentifierOf(Node.class))).setBundleId(bundleId)
- .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(messages).build()).build();
+ .setFlags(BUNDLE_FLAGS).setMessages(new MessagesBuilder().setMessage(
+ Collections.singletonList(message)).build()).build();
+
LOG.trace("Pushing flow add message {} to bundle {} for device {}", addBundleMessagesInput,
bundleId.getValue(), nodeId.getValue());
- forwardingRulesManager.getSalBundleService().addBundleMessages(addBundleMessagesInput);
+
+ final ListenableFuture<RpcResult<AddBundleMessagesOutput>> addFuture =
+ forwardingRulesManager.getSalBundleService().addBundleMessages(addBundleMessagesInput);
+ Futures.addCallback(addFuture, new FutureCallback<RpcResult<AddBundleMessagesOutput>>() {
+ @Override
+ public void onSuccess(RpcResult<AddBundleMessagesOutput> result) {
+ resultFuture.set(result);
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ resultFuture.setException(failure);
+ }
+ }, MoreExecutors.directExecutor());
+ } else {
+ resultFuture.set(rpcResult);
}
}
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.StaleFlowKey;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.AddFlowOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.FlowTableRef;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.RemoveFlowOutput;
bundleFlowForwarder = new BundleFlowForwarder(manager);
}
- @SuppressWarnings("IllegalCatch")
@Override
+ @SuppressWarnings("IllegalCatch")
public void registerListener() {
final DataTreeIdentifier<Flow> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION,
getWildCardPath());
builder.setOriginalFlow(new OriginalFlowBuilder(original).setStrict(Boolean.TRUE).build());
Long groupId = isFlowDependentOnGroup(update);
- ListenableFuture<RpcResult<UpdateFlowOutput>> future = Futures.immediateFuture(null);
if (groupId != null) {
LOG.trace("The flow {} is dependent on group {}. Checking if the group is already present",
getFlowId(new FlowRef(identifier)), groupId);
if (isGroupExistsOnDevice(nodeIdent, groupId, provider)) {
LOG.trace("The dependent group {} is already programmed. Updating the flow {}", groupId,
getFlowId(new FlowRef(identifier)));
- future = provider.getSalFlowService().updateFlow(builder.build());
- JdkFutures.addErrorLogging(future, LOG, "updateFlow");
+ return provider.getSalFlowService().updateFlow(builder.build());
} else {
LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
groupId);
+ SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture = SettableFuture.create();
Futures.addCallback(groupFuture,
- new UpdateFlowCallBack(builder.build(), nodeId, future, groupId),
+ new UpdateFlowCallBack(builder.build(), nodeId, resultFuture, groupId),
MoreExecutors.directExecutor());
+ return resultFuture;
}
- } else {
- LOG.trace("The flow {} is not dependent on any group. Updating the flow",
- getFlowId(new FlowRef(identifier)));
- future = provider.getSalFlowService().updateFlow(builder.build());
- JdkFutures.addErrorLogging(future, LOG, "updateFlow");
}
- return future;
+
+ LOG.trace("The flow {} is not dependent on any group. Updating the flow",
+ getFlowId(new FlowRef(identifier)));
+ return provider.getSalFlowService().updateFlow(builder.build());
});
}
}
LOG.trace("The dependent group {} isn't programmed yet. Pushing the group", groupId);
ListenableFuture<RpcResult<AddGroupOutput>> groupFuture = pushDependentGroup(nodeIdent,
groupId);
- Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, groupId),
- MoreExecutors.directExecutor());
- // TODO This makes new sense and seems just wrong...
- return SettableFuture.create();
+ SettableFuture<RpcResult<AddFlowOutput>> resultFuture = SettableFuture.create();
+ Futures.addCallback(groupFuture, new AddFlowCallBack(builder.build(), nodeId, groupId,
+ resultFuture), MoreExecutors.directExecutor());
+ return resultFuture;
}
- } else {
- LOG.trace("The flow {} is not dependent on any group. Adding the flow",
- getFlowId(new FlowRef(identifier)));
- return provider.getSalFlowService().addFlow(builder.build());
}
+
+ LOG.trace("The flow {} is not dependent on any group. Adding the flow",
+ getFlowId(new FlowRef(identifier)));
+ return provider.getSalFlowService().addFlow(builder.build());
});
}
}
private final AddFlowInput addFlowInput;
private final NodeId nodeId;
private final Long groupId;
+ private final SettableFuture<RpcResult<AddFlowOutput>> resultFuture;
- // TODO
- private AddFlowCallBack(final AddFlowInput addFlowInput, final NodeId nodeId, Long groupId) {
+ private AddFlowCallBack(final AddFlowInput addFlowInput, final NodeId nodeId, Long groupId,
+ SettableFuture<RpcResult<AddFlowOutput>> resultFuture) {
this.addFlowInput = addFlowInput;
this.nodeId = nodeId;
this.groupId = groupId;
+ this.resultFuture = resultFuture;
}
@Override
- @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED")
public void onSuccess(RpcResult<AddGroupOutput> rpcResult) {
- if (rpcResult.isSuccessful()) {
+ if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
+ && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
- provider.getSalFlowService().addFlow(addFlowInput);
+ Futures.addCallback(provider.getSalFlowService().addFlow(addFlowInput),
+ new FutureCallback<RpcResult<AddFlowOutput>>() {
+ @Override
+ public void onSuccess(RpcResult<AddFlowOutput> result) {
+ resultFuture.set(result);
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ resultFuture.setException(failure);
+ }
+ }, MoreExecutors.directExecutor());
+
LOG.debug("Flow add with id {} finished without error for node {}",
getFlowId(addFlowInput.getFlowRef()), nodeId);
} else {
- if (rpcResult.getErrors().size() == 1
- && rpcResult.getErrors().iterator().next().getMessage()
- .contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
- provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
- provider.getSalFlowService().addFlow(addFlowInput);
- LOG.debug("Group {} already programmed in the device. Adding the flow {}", groupId,
- getFlowId(addFlowInput.getFlowRef()));
- } else {
- LOG.error("Flow add with id {} failed for node {} with error {}",
- getFlowId(addFlowInput.getFlowRef()), nodeId, rpcResult.getErrors().toString());
- }
+ LOG.error("Flow add with id {} failed for node {} with error {}", getFlowId(addFlowInput.getFlowRef()),
+ nodeId, rpcResult.getErrors().toString());
+ resultFuture.set(RpcResultBuilder.<AddFlowOutput>failed()
+ .withRpcErrors(rpcResult.getErrors()).build());
}
}
private final UpdateFlowInput updateFlowInput;
private final NodeId nodeId;
private final Long groupId;
+ private final SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture;
private UpdateFlowCallBack(final UpdateFlowInput updateFlowInput, final NodeId nodeId,
- ListenableFuture<RpcResult<UpdateFlowOutput>> future, Long groupId) {
+ SettableFuture<RpcResult<UpdateFlowOutput>> resultFuture, Long groupId) {
this.updateFlowInput = updateFlowInput;
this.nodeId = nodeId;
this.groupId = groupId;
+ this.resultFuture = resultFuture;
}
@Override
- @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED")
public void onSuccess(RpcResult<AddGroupOutput> rpcResult) {
- if (rpcResult.isSuccessful()) {
+ if (rpcResult.isSuccessful() || rpcResult.getErrors().size() == 1
+ && rpcResult.getErrors().iterator().next().getMessage().contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
- provider.getSalFlowService().updateFlow(updateFlowInput);
+ Futures.addCallback(provider.getSalFlowService().updateFlow(updateFlowInput),
+ new FutureCallback<RpcResult<UpdateFlowOutput>>() {
+ @Override
+ public void onSuccess(RpcResult<UpdateFlowOutput> result) {
+ resultFuture.set(result);
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ resultFuture.setException(failure);
+ }
+ }, MoreExecutors.directExecutor());
+
LOG.debug("Flow update with id {} finished without error for node {}",
getFlowId(updateFlowInput.getFlowRef()), nodeId);
} else {
- if (rpcResult.getErrors().size() == 1
- && rpcResult.getErrors().iterator().next().getMessage()
- .contains(GROUP_EXISTS_IN_DEVICE_ERROR)) {
- provider.getDevicesGroupRegistry().storeGroup(nodeId, groupId);
- provider.getSalFlowService().updateFlow(updateFlowInput);
- LOG.debug("Group {} already programmed in the device. Updating the flow {}", groupId,
- getFlowId(updateFlowInput.getFlowRef()));
- } else {
- LOG.error("Flow update with id {} failed for node {} with error {}",
- getFlowId(updateFlowInput.getFlowRef()), nodeId, rpcResult.getErrors().toString());
- }
+ LOG.error("Flow update with id {} failed for node {} with error {}",
+ getFlowId(updateFlowInput.getFlowRef()), nodeId, rpcResult.getErrors().toString());
+ resultFuture.set(RpcResultBuilder.<UpdateFlowOutput>failed()
+ .withRpcErrors(rpcResult.getErrors()).build());
}
}