X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-cluster-admin-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2Fadmin%2FClusterAdminRpcService.java;h=6102db5348c41c519ac0d4c7d1e54f0ffecb3f45;hb=4b59df006c79ffb8119152e5a8bc6aadd276c031;hp=3e14a376453cb62b4f0e81154ee4267e162e093e;hpb=efd2203ccd8dc0b1aa6c1a723c9f91c26eb27f37;p=controller.git diff --git a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java index 3e14a37645..6102db5348 100644 --- a/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java +++ b/opendaylight/md-sal/sal-cluster-admin-impl/src/main/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcService.java @@ -16,6 +16,7 @@ import akka.util.Timeout; import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -106,12 +107,14 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controll import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutputBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1Builder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClients; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClientsBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LeaderActorRefBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LocalBuilder; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult; import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.opendaylight.yangtools.yang.common.Empty; import org.opendaylight.yangtools.yang.common.RpcError.ErrorType; @@ -649,7 +652,7 @@ public class ClusterAdminRpcService implements ClusterAdminService { private static RpcResult processReplies( final ImmutableMap> allShardReplies) { - final List result = new ArrayList<>(allShardReplies.size()); + final Map result = Maps.newHashMapWithExpectedSize(allShardReplies.size()); for (Entry> entry : allShardReplies.entrySet()) { final ListenableFuture future = entry.getValue(); final ShardResultBuilder builder = new ShardResultBuilder() @@ -661,22 +664,28 @@ public class ClusterAdminRpcService implements ClusterAdminService { reply = Futures.getDone(future); } catch (ExecutionException e) { LOG.debug("Shard {} failed to answer", entry.getKey(), e); - result.add(builder.setSucceeded(Boolean.FALSE).setErrorMessage(e.getCause().getMessage()).build()); + final ShardResult sr = builder + .setSucceeded(Boolean.FALSE) + .setErrorMessage(e.getCause().getMessage()) + .build(); + result.put(sr.key(), sr); continue; } - result.add(builder - .setSucceeded(Boolean.TRUE) - .addAugmentation(ShardResult1.class, new ShardResult1Builder() - .setKnownClients(reply.getClients().stream() - .map(client -> new KnownClientsBuilder() - .setMember(client.getFrontendId().getMemberName().toYang()) - .setType(client.getFrontendId().getClientType().toYang()) - .setGeneration(client.getYangGeneration()) - .build()) - .collect(Collectors.toList())) - .build()) - .build()); + final ShardResult sr = builder + .setSucceeded(Boolean.TRUE) + .addAugmentation(ShardResult1.class, new ShardResult1Builder() + .setKnownClients(reply.getClients().stream() + .map(client -> new KnownClientsBuilder() + .setMember(client.getFrontendId().getMemberName().toYang()) + .setType(client.getFrontendId().getClientType().toYang()) + .setGeneration(client.getYangGeneration()) + .build()) + .collect(Collectors.toMap(KnownClients::key, Function.identity()))) + .build()) + .build(); + + result.put(sr.key(), sr); } return RpcResultBuilder.success(new GetKnownClientsForAllShardsOutputBuilder().setShardResult(result).build()) @@ -694,19 +703,20 @@ public class ClusterAdminRpcService implements ClusterAdminService { private static SettableFuture> waitForShardResults( final List, ShardResultBuilder>> shardResultData, - final Function, T> resultDataSupplier, + final Function, T> resultDataSupplier, final String failureLogMsgPrefix) { final SettableFuture> returnFuture = SettableFuture.create(); - final List shardResults = new ArrayList<>(); + final Map shardResults = new HashMap<>(); for (final Entry, ShardResultBuilder> entry : shardResultData) { Futures.addCallback(entry.getKey(), new FutureCallback() { @Override public void onSuccess(final Success result) { synchronized (shardResults) { - ShardResultBuilder shardResult = entry.getValue(); - LOG.debug("onSuccess for shard {}, type {}", shardResult.getShardName(), - shardResult.getDataStoreType()); - shardResults.add(shardResult.setSucceeded(true).build()); + final ShardResultBuilder builder = entry.getValue(); + LOG.debug("onSuccess for shard {}, type {}", builder.getShardName(), + builder.getDataStoreType()); + final ShardResult sr = builder.setSucceeded(Boolean.TRUE).build(); + shardResults.put(sr.key(), sr); checkIfComplete(); } } @@ -714,11 +724,14 @@ public class ClusterAdminRpcService implements ClusterAdminService { @Override public void onFailure(final Throwable failure) { synchronized (shardResults) { - ShardResultBuilder shardResult = entry.getValue(); - LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, shardResult.getShardName(), - shardResult.getDataStoreType(), failure); - shardResults.add(shardResult.setSucceeded(false).setErrorMessage( - Throwables.getRootCause(failure).getMessage()).build()); + ShardResultBuilder builder = entry.getValue(); + LOG.warn("{} for shard {}, type {}", failureLogMsgPrefix, builder.getShardName(), + builder.getDataStoreType(), failure); + final ShardResult sr = builder + .setSucceeded(Boolean.FALSE) + .setErrorMessage(Throwables.getRootCause(failure).getMessage()) + .build(); + shardResults.put(sr.key(), sr); checkIfComplete(); } }