import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.cds.types.rev191024.ClientGeneration;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
import org.opendaylight.yangtools.concepts.WritableObjects;
+import org.opendaylight.yangtools.yang.common.Uint64;
/**
* A cluster-wide unique identifier of a frontend instance. This identifier discerns between individual incarnations
}
private static final long serialVersionUID = 1L;
- private final FrontendIdentifier frontendId;
+
+ private final @NonNull FrontendIdentifier frontendId;
private final long generation;
ClientIdentifier(final FrontendIdentifier frontendId, final long generation) {
this.generation = generation;
}
- public static ClientIdentifier create(final FrontendIdentifier frontendId,
+ public static @NonNull ClientIdentifier create(final FrontendIdentifier frontendId,
final long generation) {
return new ClientIdentifier(frontendId, generation);
}
- public static ClientIdentifier readFrom(final DataInput in) throws IOException {
+ public static @NonNull ClientIdentifier readFrom(final DataInput in) throws IOException {
final FrontendIdentifier frontendId = FrontendIdentifier.readFrom(in);
return new ClientIdentifier(frontendId, WritableObjects.readLong(in));
}
WritableObjects.writeLong(out, generation);
}
- public FrontendIdentifier getFrontendId() {
+ public @NonNull FrontendIdentifier getFrontendId() {
return frontendId;
}
return generation;
}
+ public @NonNull ClientGeneration getYangGeneration() {
+ return new ClientGeneration(Uint64.fromLongBits(generation));
+ }
+
@Override
public int hashCode() {
return frontendId.hashCode() * 31 + Long.hashCode(generation);
import java.io.ObjectOutput;
import java.nio.charset.StandardCharsets;
import java.util.regex.Pattern;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.concepts.WritableIdentifier;
private static final String SIMPLE_STRING_REGEX = "^[a-zA-Z0-9-_.*+:=,!~';]+$";
private static final Pattern SIMPLE_STRING_PATTERN = Pattern.compile(SIMPLE_STRING_REGEX);
private static final long serialVersionUID = 1L;
- private final String name;
+
+ private final @NonNull String name;
@SuppressFBWarnings(value = "VO_VOLATILE_REFERENCE_TO_ARRAY",
justification = "The array elements are non-volatile but we don't access them.")
* @return A {@link FrontendType} instance
* @throws IllegalArgumentException if the string is null, empty or contains invalid characters
*/
- public static FrontendType forName(final String name) {
+ public static @NonNull FrontendType forName(final String name) {
checkArgument(!Strings.isNullOrEmpty(name));
checkArgument(SIMPLE_STRING_PATTERN.matcher(name).matches(),
"Supplied name %s does not patch pattern %s", name, SIMPLE_STRING_REGEX);
return new FrontendType(name);
}
- public static FrontendType readFrom(final DataInput in) throws IOException {
+ public static @NonNull FrontendType readFrom(final DataInput in) throws IOException {
final byte[] serialized = new byte[in.readInt()];
in.readFully(serialized);
return new FrontendType(new String(serialized, StandardCharsets.UTF_8));
out.write(local);
}
- public String getName() {
+ public @NonNull String getName() {
return name;
}
+ public org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.cds.types.rev191024
+ . @NonNull FrontendType toYang() {
+ return new org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.cds.types.rev191024
+ .FrontendType(name);
+ }
+
@Override
public int hashCode() {
return name.hashCode();
return name;
}
+ public org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.cds.types.rev191024
+ .MemberName toYang() {
+ return new org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.cds.types.rev191024
+ .MemberName(name);
+ }
+
@Override
public int hashCode() {
return name.hashCode();
--- /dev/null
+module odl-controller-cds-types {
+ yang-version 1;
+ namespace "urn:opendaylight:params:xml:ns:yang:controller:cds:types";
+ prefix "cdst";
+
+ organization "The OpenDaylight Project";
+
+ description "Common type definitions related to clustered data store.";
+
+ revision 2019-10-24 {
+ description "Initial revision.";
+ }
+
+ typedef member-name {
+ description "Cluster member name.";
+ type string;
+ }
+
+ typedef frontend-type {
+ description "Frontend type.";
+ type string {
+ pattern "";
+ }
+ }
+
+ typedef client-generation {
+ description "Client generation.";
+ type uint64;
+ }
+
+ grouping frontend-identifier {
+ description "Identifier of a particular frontend.";
+ leaf member {
+ type member-name;
+ mandatory true;
+ }
+
+ leaf type {
+ type frontend-type;
+ mandatory true;
+ }
+ }
+
+ grouping client-identifier {
+ description "Identifier of a particular client.";
+ uses frontend-identifier;
+ leaf generation {
+ type client-generation;
+ mandatory true;
+ }
+ }
+}
+
<version>1.11.0-SNAPSHOT</version>
<packaging>bundle</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.opendaylight.controller</groupId>
+ <artifactId>cds-access-api</artifactId>
+ </dependency>
+ </dependencies>
+
</project>
description "Initial revision.";
}
+ import odl-controller-cds-types { prefix cds; }
+
typedef data-store-type {
type enumeration {
enum config {
description "Returns the current role for the requested module shard.";
}
+
+ rpc get-known-clients-for-all-shards {
+ description "Request all shards to report their known frontend clients. This is useful for determining what
+ generation should a resurrected member node should use.";
+
+ output {
+ uses shard-result-output {
+ augment shard-result {
+ list known-clients {
+ when "../succeeded = true";
+
+ uses cds:client-identifier;
+ key "member type";
+ }
+ }
+ }
+ }
+ }
}
import akka.util.Timeout;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.SerializationUtils;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
import org.opendaylight.controller.cluster.datastore.messages.FlipShardMembersVotingStatus;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply;
import org.opendaylight.controller.cluster.datastore.messages.GetShardRole;
import org.opendaylight.controller.cluster.datastore.messages.GetShardRoleReply;
import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetKnownClientsForAllShardsOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.ShardResult1Builder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.get.known.clients._for.all.shards.output.shard.result.KnownClientsBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LeaderActorRefBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.locate.shard.output.member.node.LocalBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingState;
}, actorUtils.getClientDispatcher());
final SettableFuture<RpcResult<MakeLeaderLocalOutput>> future = SettableFuture.create();
- makeLeaderLocalAsk.future().onComplete(new OnComplete<Object>() {
+ makeLeaderLocalAsk.future().onComplete(new OnComplete<>() {
@Override
public void onComplete(final Throwable failure, final Object success) {
if (failure != null) {
return returnFuture;
}
+
+ @Override
+ public ListenableFuture<RpcResult<GetKnownClientsForAllShardsOutput>> getKnownClientsForAllShards(
+ final GetKnownClientsForAllShardsInput input) {
+ final ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> allShardReplies =
+ getAllShardLeadersClients();
+ return Futures.whenAllComplete(allShardReplies.values()).call(() -> processReplies(allShardReplies),
+ MoreExecutors.directExecutor());
+ }
+
+ private static RpcResult<GetKnownClientsForAllShardsOutput> processReplies(
+ final ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> allShardReplies) {
+ final List<ShardResult> result = new ArrayList<>(allShardReplies.size());
+ for (Entry<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> entry : allShardReplies.entrySet()) {
+ final ListenableFuture<GetKnownClientsReply> future = entry.getValue();
+ final ShardResultBuilder builder = new ShardResultBuilder()
+ .setDataStoreType(entry.getKey().getDataStoreType())
+ .setShardName(entry.getKey().getShardName());
+
+ final GetKnownClientsReply reply;
+ try {
+ reply = Futures.getDone(future);
+ } catch (ExecutionException e) {
+ LOG.debug("Shard {} failed to answer", entry.getKey(), e);
+ result.add(builder.setSucceeded(Boolean.FALSE).setErrorMessage(e.getCause().getMessage()).build());
+ continue;
+ }
+
+ result.add(builder
+ .setSucceeded(Boolean.TRUE)
+ .addAugmentation(ShardResult1.class, new ShardResult1Builder()
+ .setKnownClients(reply.getClients().stream()
+ .map(client -> new KnownClientsBuilder()
+ .setMember(client.getFrontendId().getMemberName().toYang())
+ .setType(client.getFrontendId().getClientType().toYang())
+ .setGeneration(client.getYangGeneration())
+ .build())
+ .collect(Collectors.toList()))
+ .build())
+ .build());
+ }
+
+ return RpcResultBuilder.success(new GetKnownClientsForAllShardsOutputBuilder().setShardResult(result).build())
+ .build();
+ }
+
private static ChangeShardMembersVotingStatus toChangeShardMembersVotingStatus(final String shardName,
final List<MemberVotingState> memberVotingStatus) {
Map<String, Boolean> serverVotingStatusMap = new HashMap<>();
return returnFuture;
}
+ private ImmutableMap<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> getAllShardLeadersClients() {
+ final ImmutableMap.Builder<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> builder =
+ ImmutableMap.builder();
+
+ addAllShardsClients(builder, DataStoreType.Config, configDataStore.getActorUtils());
+ addAllShardsClients(builder, DataStoreType.Operational, operDataStore.getActorUtils());
+
+ return builder.build();
+ }
+
+ private static void addAllShardsClients(
+ final ImmutableMap.Builder<ShardIdentifier, ListenableFuture<GetKnownClientsReply>> builder,
+ final DataStoreType type, final ActorUtils utils) {
+ for (String shardName : utils.getConfiguration().getAllShardNames()) {
+ final SettableFuture<GetKnownClientsReply> future = SettableFuture.create();
+ builder.put(new ShardIdentifier(type, shardName), future);
+
+ utils.findPrimaryShardAsync(shardName).flatMap(
+ info -> Patterns.ask(info.getPrimaryShardActor(), GetKnownClients.INSTANCE, SHARD_MGR_TIMEOUT),
+ utils.getClientDispatcher()).onComplete(new OnComplete<>() {
+ @Override
+ public void onComplete(final Throwable failure, final Object success) {
+ if (failure == null) {
+ future.set((GetKnownClientsReply) success);
+ } else {
+ future.setException(failure);
+ }
+ }
+ }, utils.getClientDispatcher());
+ }
+ }
+
private static <T> ListenableFuture<RpcResult<T>> newFailedRpcResultFuture(final String message) {
return ClusterAdminRpcService.<T>newFailedRpcResultBuilder(message).buildFuture();
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.admin;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.base.MoreObjects;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DatastoreShardId;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+final class ShardIdentifier implements Identifier {
+ private static final long serialVersionUID = 1L;
+
+ private final @NonNull String shardName;
+ private final @NonNull DataStoreType type;
+
+ ShardIdentifier(final DataStoreType type, final String shardName) {
+ this.type = requireNonNull(type);
+ this.shardName = requireNonNull(shardName);
+ }
+
+ ShardIdentifier(final DatastoreShardId id) {
+ this(id.getDataStoreType(), id.getShardName());
+ }
+
+ public @NonNull String getShardName() {
+ return shardName;
+ }
+
+ public @NonNull DataStoreType getDataStoreType() {
+ return type;
+ }
+
+ @Override
+ public int hashCode() {
+ return type.hashCode() * 31 + shardName.hashCode();
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof ShardIdentifier)) {
+ return false;
+ }
+ final ShardIdentifier other = (ShardIdentifier) obj;
+ return type.equals(other.type) && shardName.equals(other.shardName);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("type", type).add("shardName", shardName).toString();
+ }
+}
import static java.util.Objects.requireNonNull;
import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
verify(clients.replace(frontendId, client, new FrontendClientMetadataBuilder.Disabled(shardName, clientId)));
}
+
+ ImmutableSet<ClientIdentifier> getClients() {
+ return clients.values().stream()
+ .map(FrontendClientMetadataBuilder::getIdentifier)
+ .collect(ImmutableSet.toImmutableSet());
+ }
}
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Range;
import java.io.IOException;
import java.util.Arrays;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients;
+import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
onMakeLeaderLocal();
} else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
store.resumeNextPendingTransaction();
+ } else if (GetKnownClients.INSTANCE.equals(message)) {
+ handleGetKnownClients();
} else if (!responseMessageSlicer.handleMessage(message)) {
super.handleNonRaftCommand(message);
}
}
}
+ private void handleGetKnownClients() {
+ final ImmutableSet<ClientIdentifier> clients;
+ if (isLeader()) {
+ clients = knownFrontends.values().stream()
+ .map(LeaderFrontendState::getIdentifier)
+ .collect(ImmutableSet.toImmutableSet());
+ } else {
+ clients = frontendMetadata.getClients();
+ }
+ sender().tell(new GetKnownClientsReply(clients), self());
+ }
+
private boolean hasLeader() {
return getLeaderId() != null;
}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Serializable;
+import org.eclipse.jdt.annotation.NonNull;
+
+/**
+ * Request a shard to report the clients it knows about. Shard is required to respond with {@link GetKnownClientsReply}.
+ */
+public final class GetKnownClients implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public static final @NonNull GetKnownClients INSTANCE = new GetKnownClients();
+
+ private GetKnownClients() {
+
+ }
+
+ private Object readResolve() {
+ return INSTANCE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static java.util.Objects.requireNonNull;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.Serializable;
+import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+
+/**
+ * Reply to {@link GetKnownClients}.
+ */
+public final class GetKnownClientsReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final @NonNull ImmutableSet<ClientIdentifier> clients;
+
+ public GetKnownClientsReply(final ImmutableSet<ClientIdentifier> clients) {
+ this.clients = requireNonNull(clients);
+ }
+
+ public @NonNull ImmutableSet<ClientIdentifier> getClients() {
+ return clients;
+ }
+}