import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1Builder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClients;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClientsBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LeaderActorRefBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LocalBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey;
import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.Empty;
import org.opendaylight.yangtools.yang.common.RpcError.ErrorType;
private static RpcResult<GetKnownClientsForAllShardsOutput> processReplies(
final ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> allShardReplies) {
- final List<ShardResult> result = new ArrayList<>(allShardReplies.size());
+ final Map<ShardResultKey, ShardResult> result = Maps.newHashMapWithExpectedSize(allShardReplies.size());
for (Entry<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> entry : allShardReplies.entrySet()) {
final ListenableFuture<GetKnownClientsReply> future = entry.getValue();
final ShardResultBuilder builder = new ShardResultBuilder()
reply = Futures.getDone(future);
} catch (ExecutionException e) {
LOG.debug("Shard {} failed to answer", entry.getKey(), e);
- result.add(builder.setSucceeded(Boolean.FALSE).setErrorMessage(e.getCause().getMessage()).build());
+ final ShardResult sr = builder
+ .setSucceeded(Boolean.FALSE)
+ .setErrorMessage(e.getCause().getMessage())
+ .build();
+ result.put(sr.key(), sr);
continue;
}
- result.add(builder
- .setSucceeded(Boolean.TRUE)
- .addAugmentation(ShardResult1.class, new ShardResult1Builder()
- .setKnownClients(reply.getClients().stream()
- .map(client -> new KnownClientsBuilder()
- .setMember(client.getFrontendId().getMemberName().toYang())
- .setType(client.getFrontendId().getClientType().toYang())
- .setGeneration(client.getYangGeneration())
- .build())
- .collect(Collectors.toList()))
- .build())
- .build());
+ final ShardResult sr = builder
+ .setSucceeded(Boolean.TRUE)
+ .addAugmentation(ShardResult1.class, new ShardResult1Builder()
+ .setKnownClients(reply.getClients().stream()
+ .map(client -> new KnownClientsBuilder()
+ .setMember(client.getFrontendId().getMemberName().toYang())
+ .setType(client.getFrontendId().getClientType().toYang())
+ .setGeneration(client.getYangGeneration())
+ .build())
+ .collect(Collectors.toMap(KnownClients::key, Function.identity())))
+ .build())
+ .build();
+
+ result.put(sr.key(), sr);
}
return RpcResultBuilder.success(new GetKnownClientsForAllShardsOutputBuilder().setShardResult(result).build())
private static <T> SettableFuture<RpcResult<T>> waitForShardResults(
final List<Entry<ListenableFuture<Success>, ShardResultBuilder>> shardResultData,
- final Function<List<ShardResult>, T> resultDataSupplier,
+ final Function<Map<ShardResultKey, ShardResult>, T> resultDataSupplier,
final String failureLogMsgPrefix) {
final SettableFuture<RpcResult<T>> returnFuture = SettableFuture.create();
- final List<ShardResult> shardResults = new ArrayList<>();
+ final Map<ShardResultKey, ShardResult> shardResults = new HashMap<>();
for (final Entry<ListenableFuture<Success>, ShardResultBuilder> entry : shardResultData) {
Futures.addCallback(entry.getKey(), new FutureCallback<Success>() {
@Override
public void onSuccess(final Success result) {
synchronized (shardResults) {
- ShardResultBuilder shardResult = entry.getValue();
- LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(),
- shardResult.getDataStoreType());
- shardResults.add(shardResult.setSucceeded(true).build());
+ final ShardResultBuilder builder = entry.getValue();
+ LOG.debug("onSuccess for shard {}, type {}", builder.getShardName(),
+ builder.getDataStoreType());
+ final ShardResult sr = builder.setSucceeded(Boolean.TRUE).build();
+ shardResults.put(sr.key(), sr);
checkIfComplete();
}
}
@Override
public void onFailure(final Throwable failure) {
synchronized (shardResults) {
- ShardResultBuilder shardResult = entry.getValue();
- LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(),
- shardResult.getDataStoreType(), failure);
- shardResults.add(shardResult.setSucceeded(false).setErrorMessage(
- Throwables.getRootCause(failure).getMessage()).build());
+ ShardResultBuilder builder = entry.getValue();
+ LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, builder.getShardName(),
+ builder.getDataStoreType(), failure);
+ final ShardResult sr = builder
+ .setSucceeded(Boolean.FALSE)
+ .setErrorMessage(Throwables.getRootCause(failure).getMessage())
+ .build();
+ shardResults.put(sr.key(), sr);
checkIfComplete();
}
}