+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.dom.api;
-
-import com.google.common.annotations.Beta;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
-
-/**
- * An extension to {@link DOMDataTreeProducer}, which allows users access
- * to information about the backing shard.
- *
- * @author Robert Varga
- */
-@Beta
-@Deprecated(forRemoval = true)
-public interface CDSDataTreeProducer extends DOMDataTreeProducer {
- /**
- * Return a {@link CDSShardAccess} handle. This handle will remain valid
- * as long as this producer is operational. Returned handle can be accessed
- * independently from this producer and is not subject to the usual access
- * restrictions imposed on DOMDataTreeProducer state.
- *
- * @param subtree One of the subtrees to which are currently under control of this producer
- * @return A shard access handle.
- * @throws NullPointerException when subtree is null
- * @throws IllegalArgumentException if the specified subtree is not controlled by this producer
- * @throws IllegalStateException if this producer is no longer operational
- * @throws IllegalThreadStateException if the access rules to this producer
- * are violated, for example if this producer is bound and this thread
- * is currently not executing from a listener context.
- */
- @NonNull CDSShardAccess getShardAccess(@NonNull DOMDataTreeIdentifier subtree);
-}
-
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.dom.api;
-
-import com.google.common.annotations.Beta;
-import java.util.concurrent.CompletionStage;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-
-/**
- * Unprivileged access interface to shard information. Provides read-only access to operational details about a CDS
- * shard.
- *
- * @author Robert Varga
- */
-@Beta
-@Deprecated(forRemoval = true)
-public interface CDSShardAccess {
- /**
- * Return the shard identifier.
- *
- * @return Shard identifier.
- * @throws IllegalStateException if the {@link CDSDataTreeProducer} from which the associated
- * {@link CDSDataTreeProducer} is no longer valid.
- */
- @NonNull DOMDataTreeIdentifier getShardIdentifier();
-
- /**
- * Return the shard leader location relative to the local node.
- *
- * @return Shard leader location.
- * @throws IllegalStateException if the {@link CDSDataTreeProducer} from which the associated
- * {@link CDSDataTreeProducer} is no longer valid.
- */
- @NonNull LeaderLocation getLeaderLocation();
-
- /**
- * Request the shard leader to be moved to the local node. The request will be evaluated against shard state and
- * satisfied if leader movement is possible. If current shard policy or state prevents the movement from happening,
- * the returned {@link CompletionStage} will report an exception.
- *
- * <p>
- * This is a one-time operation, which does not prevent further movement happening in future. Even if this request
- * succeeds, there is no guarantee that the leader will remain local in face of failures, shutdown or any future
- * movement requests from other nodes.
- *
- * <p>
- * Note that due to asynchronous nature of CDS, the leader may no longer be local by the time the returned
- * {@link CompletionStage} reports success.
- *
- * @return A {@link CompletionStage} representing the request.
- * @throws IllegalStateException if the {@link CDSDataTreeProducer} from which the associated
- * {@link CDSDataTreeProducer} is no longer valid.
- */
- @NonNull CompletionStage<Void> makeLeaderLocal();
-
- /**
- * Register a listener to shard location changes. Each listener object can be registered at most once.
- *
- * @param listener Listener object
- * @return A {@link LeaderLocationListenerRegistration} for the listener.
- * @throws IllegalArgumentException if the specified listener is already registered.
- * @throws IllegalStateException if the {@link CDSDataTreeProducer} from which the associated
- * {@link CDSDataTreeProducer} is no longer valid.
- * @throws NullPointerException if listener is null.
- */
- @NonNull <L extends LeaderLocationListener> LeaderLocationListenerRegistration<L> registerLeaderLocationListener(
- @NonNull L listener);
-}
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.ServerInfo;
import org.opendaylight.controller.cluster.raft.persisted.UpdateElectionTerm;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
-import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
-import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.car.rev140818.Cars;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.sal.clustering.it.people.rev140818.People;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddPrefixShardReplicaOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddReplicasForAllShardsOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.AddShardReplicaInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.DataStoreType;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.FlipMemberVotingStatesForAllShardsOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetPrefixShardRoleOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.GetShardRoleOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.MakeLeaderLocalOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveAllShardReplicasOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaInputBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemovePrefixShardReplicaOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.RemoveShardReplicaOutput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.member.voting.states.input.MemberVotingStateBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResult;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.cluster.admin.rev151013.shard.result.output.ShardResultKey;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
assertEquals("DatastoreSnapshot shard names", Sets.newHashSet(expShardNames), shardNames);
}
- @Test
- public void testAddRemovePrefixShardReplica() throws Exception {
- String name = "testAddPrefixShardReplica";
- String moduleShardsConfig = "module-shards-default.conf";
-
- final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
- .moduleShardsConfig(moduleShardsConfig).build();
- final MemberNode replicaNode2 = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
- .moduleShardsConfig(moduleShardsConfig).build();
- final MemberNode replicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
- .moduleShardsConfig(moduleShardsConfig).build();
-
- member1.waitForMembersUp("member-2", "member-3");
- replicaNode2.kit().waitForMembersUp("member-1", "member-3");
- replicaNode3.kit().waitForMembersUp("member-1", "member-2");
-
- final ActorRef shardManager1 = member1.configDataStore().getActorUtils().getShardManager();
-
- shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
- "prefix", Collections.singleton(MEMBER_1))),
- ActorRef.noSender());
-
- member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(),
- ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
-
- final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
- final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
- Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
-
- addPrefixShardReplica(replicaNode2, identifier, serializer,
- ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1");
-
- addPrefixShardReplica(replicaNode3, identifier, serializer,
- ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-1", "member-2");
-
- verifyRaftPeersPresent(member1.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
- "member-2", "member-3");
-
- removePrefixShardReplica(member1, identifier, "member-3", serializer,
- ClusterUtils.getCleanShardName(CarsModel.BASE_PATH), "member-2");
-
- verifyNoShardPresent(replicaNode3.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
- verifyRaftPeersPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH),
- "member-1");
-
- removePrefixShardReplica(member1, identifier, "member-2", serializer,
- ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
-
- verifyNoShardPresent(replicaNode2.configDataStore(), ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
- }
-
- @Test
- public void testGetShardRole() throws Exception {
- String name = "testGetShardRole";
- String moduleShardsConfig = "module-shards-default-member-1.conf";
-
- final MemberNode member1 = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
- .moduleShardsConfig(moduleShardsConfig).build();
-
- member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default");
-
- final RpcResult<GetShardRoleOutput> successResult =
- getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "default");
- verifySuccessfulRpcResult(successResult);
- assertEquals("Leader", successResult.getResult().getRole());
-
- final RpcResult<GetShardRoleOutput> failedResult =
- getShardRole(member1, Mockito.mock(BindingNormalizedNodeSerializer.class), "cars");
-
- verifyFailedRpcResult(failedResult);
-
- final ActorRef shardManager1 = member1.configDataStore().getActorUtils().getShardManager();
-
- shardManager1.tell(new PrefixShardCreated(new PrefixShardConfiguration(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, CarsModel.BASE_PATH),
- "prefix", Collections.singleton(MEMBER_1))),
- ActorRef.noSender());
-
- member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(),
- ClusterUtils.getCleanShardName(CarsModel.BASE_PATH));
-
- final InstanceIdentifier<Cars> identifier = InstanceIdentifier.create(Cars.class);
- final BindingNormalizedNodeSerializer serializer = Mockito.mock(BindingNormalizedNodeSerializer.class);
- Mockito.doReturn(CarsModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(identifier);
-
- final RpcResult<GetPrefixShardRoleOutput> prefixSuccessResult =
- getPrefixShardRole(member1, identifier, serializer);
-
- verifySuccessfulRpcResult(prefixSuccessResult);
- assertEquals("Leader", prefixSuccessResult.getResult().getRole());
-
- final InstanceIdentifier<People> peopleId = InstanceIdentifier.create(People.class);
- Mockito.doReturn(PeopleModel.BASE_PATH).when(serializer).toYangInstanceIdentifier(peopleId);
-
- final RpcResult<GetPrefixShardRoleOutput> prefixFail =
- getPrefixShardRole(member1, peopleId, serializer);
-
- verifyFailedRpcResult(prefixFail);
- }
-
@Test
public void testGetPrefixShardRole() throws Exception {
String name = "testGetPrefixShardRole";
.moduleShardsConfig(moduleShardsConfig).build();
member1.kit().waitUntilLeader(member1.configDataStore().getActorUtils(), "default");
-
-
}
@Test
assertEquals("Data node", expCarsNode, optional.get());
}
- private static RpcResult<GetShardRoleOutput> getShardRole(final MemberNode memberNode,
- final BindingNormalizedNodeSerializer serializer, final String shardName) throws Exception {
-
- final GetShardRoleInput input = new GetShardRoleInputBuilder()
- .setDataStoreType(DataStoreType.Config)
- .setShardName(shardName)
- .build();
-
- final ClusterAdminRpcService service =
- new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
-
- return service.getShardRole(input).get(10, TimeUnit.SECONDS);
- }
-
- private static RpcResult<GetPrefixShardRoleOutput> getPrefixShardRole(
- final MemberNode memberNode,
- final InstanceIdentifier<?> identifier,
- final BindingNormalizedNodeSerializer serializer) throws Exception {
-
- final GetPrefixShardRoleInput input = new GetPrefixShardRoleInputBuilder()
- .setDataStoreType(DataStoreType.Config)
- .setShardPrefix(identifier)
- .build();
-
- final ClusterAdminRpcService service =
- new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
-
- return service.getPrefixShardRole(input).get(10, TimeUnit.SECONDS);
- }
-
- private static void addPrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier<?> identifier,
- final BindingNormalizedNodeSerializer serializer, final String shardName,
- final String... peerMemberNames) throws Exception {
-
- final AddPrefixShardReplicaInput input = new AddPrefixShardReplicaInputBuilder()
- .setShardPrefix(identifier)
- .setDataStoreType(DataStoreType.Config).build();
-
- final ClusterAdminRpcService service =
- new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
-
- final RpcResult<AddPrefixShardReplicaOutput> rpcResult = service.addPrefixShardReplica(input)
- .get(10, TimeUnit.SECONDS);
- verifySuccessfulRpcResult(rpcResult);
-
- verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
- Optional<ActorRef> optional = memberNode.configDataStore().getActorUtils().findLocalShard(shardName);
- assertTrue("Replica shard not present", optional.isPresent());
- }
-
- private static void removePrefixShardReplica(final MemberNode memberNode, final InstanceIdentifier<?> identifier,
- final String removeFromMember, final BindingNormalizedNodeSerializer serializer, final String shardName,
- final String... peerMemberNames) throws Exception {
- final RemovePrefixShardReplicaInput input = new RemovePrefixShardReplicaInputBuilder()
- .setDataStoreType(DataStoreType.Config)
- .setShardPrefix(identifier)
- .setMemberName(removeFromMember).build();
-
- final ClusterAdminRpcService service =
- new ClusterAdminRpcService(memberNode.configDataStore(), memberNode.operDataStore(), serializer);
-
- final RpcResult<RemovePrefixShardReplicaOutput> rpcResult = service.removePrefixShardReplica(input)
- .get(10, TimeUnit.SECONDS);
- verifySuccessfulRpcResult(rpcResult);
-
- verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
- }
-
private static void doAddShardReplica(final MemberNode memberNode, final String shardName,
final String... peerMemberNames) throws Exception {
memberNode.waitForMembersUp(peerMemberNames);
import com.google.common.util.concurrent.FluentFuture;
import java.util.Collection;
import java.util.Optional;
-import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCursor;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
*/
@Beta
public class ClientTransaction extends AbstractClientHandle<AbstractProxyTransaction> {
-
- private ClientTransactionCursor cursor;
-
ClientTransaction(final AbstractClientHistory parent, final TransactionIdentifier transactionId) {
super(parent, transactionId);
}
return ensureProxy(path);
}
- @Deprecated(forRemoval = true)
- public DOMDataTreeWriteCursor openCursor() {
- Preconditions.checkState(cursor == null, "Transaction %s has open cursor", getIdentifier());
- cursor = new ClientTransactionCursor(this);
- return cursor;
- }
-
public FluentFuture<Boolean> exists(final YangInstanceIdentifier path) {
return ensureTransactionProxy(path).exists(path);
}
final AbstractProxyTransaction createProxy(final Long shard) {
return parent().createTransactionProxy(getIdentifier(), shard);
}
-
- void closeCursor(final @NonNull DOMDataTreeCursor cursorToClose) {
- if (cursorToClose.equals(this.cursor)) {
- this.cursor = null;
- }
- }
}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker.actors.dds;
-
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import java.util.Arrays;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * A {@link DOMDataTreeWriteCursor} tied to a {@link ClientTransaction}.
- *
- * @author Robert Varga
- */
-@Deprecated(forRemoval = true)
-final class ClientTransactionCursor implements DOMDataTreeWriteCursor {
- private YangInstanceIdentifier current = YangInstanceIdentifier.empty();
- private final ClientTransaction parent;
-
- ClientTransactionCursor(final ClientTransaction parent) {
- this.parent = requireNonNull(parent);
- }
-
- @Override
- public void enter(final PathArgument child) {
- current = current.node(child);
- }
-
- @Override
- public void enter(final PathArgument... path) {
- enter(Arrays.asList(path));
- }
-
- @Override
- public void enter(final Iterable<PathArgument> path) {
- path.forEach(this::enter);
- }
-
- @Override
- public void exit() {
- final YangInstanceIdentifier currentParent = current.getParent();
- checkState(currentParent != null);
- current = currentParent;
- }
-
- @Override
- public void exit(final int depth) {
- for (int i = 0; i < depth; ++i) {
- exit();
- }
- }
-
- @Override
- public void close() {
- parent.closeCursor(this);
- }
-
- @Override
- public void delete(final PathArgument child) {
- parent.delete(current.node(child));
- }
-
- @Override
- public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
- parent.merge(current.node(child), data);
- }
-
- @Override
- public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
- parent.write(current.node(child), data);
- }
-}
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActorWithMetering;
import org.opendaylight.controller.cluster.common.actor.Dispatchers;
-import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.raft.messages.ServerChangeStatus;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
-import org.opendaylight.controller.cluster.sharding.PrefixedShardConfigUpdateHandler;
-import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
import org.opendaylight.yangtools.concepts.Registration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private final Set<Consumer<String>> shardAvailabilityCallbacks = new HashSet<>();
private final String persistenceId;
- private final AbstractDataStore dataStore;
-
- private PrefixedShardConfigUpdateHandler configUpdateHandler;
ShardManager(final AbstractShardManagerCreator<?> builder) {
this.cluster = builder.getCluster();
"shard-manager-" + this.type,
datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
shardManagerMBean.registerMBean();
-
- dataStore = builder.getDistributedDataStore();
}
@Override
onAddShardReplica((AddShardReplica) message);
} else if (message instanceof AddPrefixShardReplica) {
onAddPrefixShardReplica((AddPrefixShardReplica) message);
- } else if (message instanceof PrefixShardCreated) {
- onPrefixShardCreated((PrefixShardCreated) message);
- } else if (message instanceof PrefixShardRemoved) {
- onPrefixShardRemoved((PrefixShardRemoved) message);
- } else if (message instanceof InitConfigListener) {
- onInitConfigListener();
} else if (message instanceof ForwardedAddServerReply) {
ForwardedAddServerReply msg = (ForwardedAddServerReply)message;
onAddServerReply(msg.shardInfo, msg.addServerReply, getSender(), msg.leaderPath,
getSender().tell(new GetShardRoleReply(shardInformation.getRole()), ActorRef.noSender());
}
- private void onInitConfigListener() {
- LOG.debug("{}: Initializing config listener on {}", persistenceId(), cluster.getCurrentMemberName());
-
- final org.opendaylight.mdsal.common.api.LogicalDatastoreType datastoreType =
- org.opendaylight.mdsal.common.api.LogicalDatastoreType
- .valueOf(datastoreContextFactory.getBaseDatastoreContext().getLogicalStoreType().name());
-
- if (configUpdateHandler != null) {
- configUpdateHandler.close();
- }
-
- configUpdateHandler = new PrefixedShardConfigUpdateHandler(self(), cluster.getCurrentMemberName());
- configUpdateHandler.initListener(dataStore, datastoreType);
- }
-
void onShutDown() {
List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
for (ShardInformation info : localShards.values()) {
}
}
- private void onPrefixShardCreated(final PrefixShardCreated message) {
- LOG.debug("{}: onPrefixShardCreated: {}", persistenceId(), message);
-
- final PrefixShardConfiguration config = message.getConfiguration();
- final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
- ClusterUtils.getCleanShardName(config.getPrefix().getRootIdentifier()));
- final String shardName = shardId.getShardName();
-
- if (isPreviousShardActorStopInProgress(shardName, message)) {
- return;
- }
-
- if (localShards.containsKey(shardName)) {
- LOG.debug("{}: Received create for an already existing shard {}", persistenceId(), shardName);
- final PrefixShardConfiguration existing =
- configuration.getAllPrefixShardConfigurations().get(config.getPrefix());
-
- if (existing != null && existing.equals(config)) {
- // we don't have to do nothing here
- return;
- }
- }
-
- doCreatePrefixShard(config, shardId, shardName);
- }
-
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
private boolean isPreviousShardActorStopInProgress(final String shardName, final Object messageToDefer) {
final CompositeOnComplete<Boolean> stopOnComplete = shardActorsStopping.get(shardName);
if (stopOnComplete == null) {
return true;
}
- private void doCreatePrefixShard(final PrefixShardConfiguration config, final ShardIdentifier shardId,
- final String shardName) {
- configuration.addPrefixShardConfiguration(config);
-
- final Builder builder = newShardDatastoreContextBuilder(shardName);
- builder.logicalStoreType(config.getPrefix().getDatastoreType())
- .storeRoot(config.getPrefix().getRootIdentifier());
- DatastoreContext shardDatastoreContext = builder.build();
-
- final Map<String, String> peerAddresses = getPeerAddresses(shardName);
- final boolean isActiveMember = true;
-
- LOG.debug("{} doCreatePrefixShard: shardId: {}, memberNames: {}, peerAddresses: {}, isActiveMember: {}",
- persistenceId(), shardId, config.getShardMemberNames(), peerAddresses, isActiveMember);
-
- final ShardInformation info = new ShardInformation(shardName, shardId, peerAddresses,
- shardDatastoreContext, Shard.builder(), peerAddressResolver);
- info.setActiveMember(isActiveMember);
- localShards.put(info.getShardName(), info);
-
- if (schemaContext != null) {
- info.setSchemaContext(schemaContext);
- info.setActor(newShardActor(info));
- }
- }
-
- private void onPrefixShardRemoved(final PrefixShardRemoved message) {
- LOG.debug("{}: onPrefixShardRemoved : {}", persistenceId(), message);
-
- final DOMDataTreeIdentifier prefix = message.getPrefix();
- final ShardIdentifier shardId = getShardIdentifier(cluster.getCurrentMemberName(),
- ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
-
- configuration.removePrefixShardConfiguration(prefix);
- removeShard(shardId);
- }
-
private void doCreateShard(final CreateShard createShard) {
final ModuleShardConfiguration moduleShardConfig = createShard.getModuleShardConfig();
final String shardName = moduleShardConfig.getShardName();
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static akka.actor.ActorRef.noSender;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
-import akka.util.Timeout;
-import java.util.Collection;
-import java.util.Optional;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ConcurrentHashMap;
-import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
-import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
-import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
-import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
-import org.opendaylight.controller.cluster.dom.api.LeaderLocationListenerRegistration;
-import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.compat.java8.FutureConverters;
-import scala.concurrent.Future;
-
-/**
- * Default {@link CDSShardAccess} implementation. Listens on leader location
- * change events and distributes them to registered listeners. Also updates
- * current information about leader location accordingly.
- *
- * <p>
- * Sends {@link MakeLeaderLocal} message to local shards and translates its result
- * on behalf users {@link #makeLeaderLocal()} calls.
- *
- * <p>
- * {@link org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer} that
- * creates instances of this class has to call {@link #close()} once it is no
- * longer valid.
- */
-@Deprecated(forRemoval = true)
-final class CDSShardAccessImpl implements CDSShardAccess, LeaderLocationListener, AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(CDSShardAccessImpl.class);
-
- private final Collection<LeaderLocationListener> listeners = ConcurrentHashMap.newKeySet();
- private final DOMDataTreeIdentifier prefix;
- private final ActorUtils actorUtils;
- private final Timeout makeLeaderLocalTimeout;
-
- private ActorRef roleChangeListenerActor;
-
- private volatile LeaderLocation currentLeader = LeaderLocation.UNKNOWN;
- private volatile boolean closed = false;
-
- CDSShardAccessImpl(final DOMDataTreeIdentifier prefix, final ActorUtils actorUtils) {
- this.prefix = requireNonNull(prefix);
- this.actorUtils = requireNonNull(actorUtils);
- this.makeLeaderLocalTimeout =
- new Timeout(actorUtils.getDatastoreContext().getShardLeaderElectionTimeout().duration().$times(2));
-
- // register RoleChangeListenerActor
- // TODO Maybe we should do this in async
- final Optional<ActorRef> localShardReply =
- actorUtils.findLocalShard(ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
- checkState(localShardReply.isPresent(),
- "Local shard for {} not present. Cannot register RoleChangeListenerActor", prefix);
- roleChangeListenerActor =
- actorUtils.getActorSystem().actorOf(RoleChangeListenerActor.props(localShardReply.get(), this));
- }
-
- private void checkNotClosed() {
- checkState(!closed, "CDSDataTreeProducer, that this CDSShardAccess is associated with, is no longer valid");
- }
-
- @Override
- public DOMDataTreeIdentifier getShardIdentifier() {
- checkNotClosed();
- return prefix;
- }
-
- @Override
- public LeaderLocation getLeaderLocation() {
- checkNotClosed();
- // TODO before getting first notification from roleChangeListenerActor
- // we will always return UNKNOWN
- return currentLeader;
- }
-
- @Override
- public CompletionStage<Void> makeLeaderLocal() {
- // TODO when we have running make leader local operation
- // we should just return the same completion stage
- checkNotClosed();
-
- // TODO can we cache local shard actorRef?
- final Future<ActorRef> localShardReply =
- actorUtils.findLocalShardAsync(ClusterUtils.getCleanShardName(prefix.getRootIdentifier()));
-
- // we have to tell local shard to make leader local
- final scala.concurrent.Promise<Object> makeLeaderLocalAsk = Futures.promise();
- localShardReply.onComplete(new OnComplete<ActorRef>() {
- @Override
- public void onComplete(final Throwable failure, final ActorRef actorRef) {
- if (failure instanceof LocalShardNotFoundException) {
- LOG.debug("No local shard found for {} - Cannot request leadership transfer to local shard.",
- getShardIdentifier(), failure);
- makeLeaderLocalAsk.failure(failure);
- } else if (failure != null) {
- // TODO should this be WARN?
- LOG.debug("Failed to find local shard for {} - Cannot request leadership transfer to local shard.",
- getShardIdentifier(), failure);
- makeLeaderLocalAsk.failure(failure);
- } else {
- makeLeaderLocalAsk
- .completeWith(actorUtils
- .executeOperationAsync(actorRef, MakeLeaderLocal.INSTANCE, makeLeaderLocalTimeout));
- }
- }
- }, actorUtils.getClientDispatcher());
-
- // we have to transform make leader local request result
- Future<Void> makeLeaderLocalFuture = makeLeaderLocalAsk.future()
- .transform(new Mapper<Object, Void>() {
- @Override
- public Void apply(final Object parameter) {
- return null;
- }
- }, new Mapper<Throwable, Throwable>() {
- @Override
- public Throwable apply(final Throwable parameter) {
- if (parameter instanceof LeadershipTransferFailedException) {
- // do nothing with exception and just pass it as it is
- return parameter;
- }
- // wrap exception in LeadershipTransferFailedEx
- return new LeadershipTransferFailedException("Leadership transfer failed", parameter);
- }
- }, actorUtils.getClientDispatcher());
-
- return FutureConverters.toJava(makeLeaderLocalFuture);
- }
-
- @Override
- public <L extends LeaderLocationListener> LeaderLocationListenerRegistration<L>
- registerLeaderLocationListener(final L listener) {
- checkNotClosed();
- requireNonNull(listener);
- checkArgument(!listeners.contains(listener), "Listener %s is already registered with ShardAccess %s", listener,
- this);
-
- LOG.debug("Registering LeaderLocationListener {}", listener);
-
- listeners.add(listener);
-
- return new LeaderLocationListenerRegistration<>() {
- @Override
- public L getInstance() {
- return listener;
- }
-
- @Override
- public void close() {
- listeners.remove(listener);
- }
- };
- }
-
- @Override
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void onLeaderLocationChanged(final LeaderLocation location) {
- if (closed) {
- // we are closed already. Do not dispatch any new leader location
- // change events.
- return;
- }
-
- LOG.debug("Received leader location change notification. New leader location: {}", location);
- currentLeader = location;
- listeners.forEach(listener -> {
- try {
- listener.onLeaderLocationChanged(location);
- } catch (Exception e) {
- LOG.warn("Ignoring uncaught exception thrown be LeaderLocationListener {} "
- + "during processing leader location change {}", listener, location, e);
- }
- });
- }
-
- @Override
- public void close() {
- // TODO should we also remove all listeners?
- LOG.debug("Closing {} ShardAccess", prefix);
- closed = true;
-
- if (roleChangeListenerActor != null) {
- // stop RoleChangeListenerActor
- roleChangeListenerActor.tell(PoisonPill.getInstance(), noSender());
- roleChangeListenerActor = null;
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import com.google.common.annotations.Beta;
-import org.eclipse.jdt.annotation.NonNull;
-
-/**
- * Exception thrown when there was a at any point during the creation of a shard via {@link DistributedShardFactory}.
- */
-@Beta
-@Deprecated(forRemoval = true)
-public class DOMDataTreeShardCreationFailedException extends Exception {
- private static final long serialVersionUID = 1L;
-
- public DOMDataTreeShardCreationFailedException(final @NonNull String message) {
- super(message);
- }
-
- public DOMDataTreeShardCreationFailedException(final @NonNull String message, final @NonNull Throwable cause) {
- super(message, cause);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.spi.AbstractDOMDataTreeChangeListenerRegistration;
-import org.opendaylight.mdsal.dom.spi.AbstractRegistrationTree;
-import org.opendaylight.mdsal.dom.spi.RegistrationTreeNode;
-import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTreeChangePublisher;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeConfiguration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
-import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
-import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFailedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(forRemoval = true)
-public class DistributedShardChangePublisher
- extends AbstractRegistrationTree<AbstractDOMDataTreeChangeListenerRegistration<?>>
- implements DOMStoreTreeChangePublisher {
-
- private static final Logger LOG = LoggerFactory.getLogger(DistributedShardChangePublisher.class);
-
- private final DistributedDataStoreInterface distributedDataStore;
- private final YangInstanceIdentifier shardPath;
-
- private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards;
-
- @GuardedBy("this")
- private final DataTree dataTree;
-
- public DistributedShardChangePublisher(final DataStoreClient client,
- final DistributedDataStoreInterface distributedDataStore,
- final DOMDataTreeIdentifier prefix,
- final Map<DOMDataTreeIdentifier, ChildShardContext> childShards) {
- this.distributedDataStore = distributedDataStore;
- // TODO keeping the whole dataTree thats contained in subshards doesn't seem like a good idea
- // maybe the whole listener logic would be better in the backend shards where we have direct access to the
- // dataTree and wont have to cache it redundantly.
-
- final DataTreeConfiguration baseConfig;
- switch (prefix.getDatastoreType()) {
- case CONFIGURATION:
- baseConfig = DataTreeConfiguration.DEFAULT_CONFIGURATION;
- break;
- case OPERATIONAL:
- baseConfig = DataTreeConfiguration.DEFAULT_OPERATIONAL;
- break;
- default:
- throw new UnsupportedOperationException("Unknown prefix type " + prefix.getDatastoreType());
- }
-
- this.dataTree = new InMemoryDataTreeFactory().create(new DataTreeConfiguration.Builder(baseConfig.getTreeType())
- .setMandatoryNodesValidation(baseConfig.isMandatoryNodesValidationEnabled())
- .setUniqueIndexes(baseConfig.isUniqueIndexEnabled())
- .setRootPath(prefix.getRootIdentifier())
- .build());
-
- // XXX: can we guarantee that the root is present in the schemacontext?
- this.dataTree.setEffectiveModelContext(distributedDataStore.getActorUtils().getSchemaContext());
- this.shardPath = prefix.getRootIdentifier();
- this.childShards = childShards;
- }
-
- protected void registrationRemoved(final AbstractDOMDataTreeChangeListenerRegistration<?> registration) {
- LOG.debug("Closing registration {}", registration);
- }
-
- @Override
- public <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
- registerTreeChangeListener(final YangInstanceIdentifier path, final L listener) {
- takeLock();
- try {
- return setupListenerContext(path, listener);
- } finally {
- releaseLock();
- }
- }
-
- private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
- setupListenerContext(final YangInstanceIdentifier listenerPath, final L listener) {
- // we need to register the listener registration path based on the shards root
- // we have to strip the shard path from the listener path and then register
- YangInstanceIdentifier strippedIdentifier = listenerPath;
- if (!shardPath.isEmpty()) {
- strippedIdentifier = YangInstanceIdentifier.create(stripShardPath(shardPath, listenerPath));
- }
-
- final DOMDataTreeListenerWithSubshards subshardListener =
- new DOMDataTreeListenerWithSubshards(strippedIdentifier, listener);
- final AbstractDOMDataTreeChangeListenerRegistration<L> reg =
- setupContextWithoutSubshards(listenerPath, strippedIdentifier, subshardListener);
-
- for (final ChildShardContext maybeAffected : childShards.values()) {
- if (listenerPath.contains(maybeAffected.getPrefix().getRootIdentifier())) {
- // consumer has initialDataChangeEvent subshard somewhere on lower level
- // register to the notification manager with snapshot and forward child notifications to parent
- LOG.debug("Adding new subshard{{}} to listener at {}", maybeAffected.getPrefix(), listenerPath);
- subshardListener.addSubshard(maybeAffected);
- } else if (maybeAffected.getPrefix().getRootIdentifier().contains(listenerPath)) {
- // bind path is inside subshard
- // TODO can this happen? seems like in ShardedDOMDataTree we are
- // already registering to the lowest shard possible
- throw new UnsupportedOperationException("Listener should be registered directly "
- + "into initialDataChangeEvent subshard");
- }
- }
-
- return reg;
- }
-
- private <L extends DOMDataTreeChangeListener> AbstractDOMDataTreeChangeListenerRegistration<L>
- setupContextWithoutSubshards(final YangInstanceIdentifier shardLookup,
- final YangInstanceIdentifier listenerPath,
- final DOMDataTreeListenerWithSubshards listener) {
-
- LOG.debug("Registering root listener full path: {}, path inside shard: {}", shardLookup, listenerPath);
-
- // register in the shard tree
- final RegistrationTreeNode<AbstractDOMDataTreeChangeListenerRegistration<?>> node =
- findNodeFor(listenerPath.getPathArguments());
-
- // register listener in CDS
- ListenerRegistration<DOMDataTreeChangeListener> listenerReg = distributedDataStore
- .registerProxyListener(shardLookup, listenerPath, listener);
-
- @SuppressWarnings("unchecked")
- final AbstractDOMDataTreeChangeListenerRegistration<L> registration =
- new AbstractDOMDataTreeChangeListenerRegistration<>((L) listener) {
- @Override
- protected void removeRegistration() {
- listener.close();
- DistributedShardChangePublisher.this.removeRegistration(node, this);
- registrationRemoved(this);
- listenerReg.close();
- }
- };
- addRegistration(node, registration);
-
- return registration;
- }
-
- private static Iterable<PathArgument> stripShardPath(final YangInstanceIdentifier shardPath,
- final YangInstanceIdentifier listenerPath) {
- if (shardPath.isEmpty()) {
- return listenerPath.getPathArguments();
- }
-
- final List<PathArgument> listenerPathArgs = new ArrayList<>(listenerPath.getPathArguments());
- final Iterator<PathArgument> shardIter = shardPath.getPathArguments().iterator();
- final Iterator<PathArgument> listenerIter = listenerPathArgs.iterator();
-
- while (shardIter.hasNext()) {
- if (shardIter.next().equals(listenerIter.next())) {
- listenerIter.remove();
- } else {
- break;
- }
- }
-
- return listenerPathArgs;
- }
-
- synchronized DataTreeCandidate applyChanges(final YangInstanceIdentifier listenerPath,
- final Collection<DataTreeCandidate> changes) throws DataValidationFailedException {
- final DataTreeModification modification = dataTree.takeSnapshot().newModification();
- for (final DataTreeCandidate change : changes) {
- try {
- DataTreeCandidates.applyToModification(modification, change);
- } catch (SchemaValidationFailedException e) {
- LOG.error("Validation failed", e);
- }
- }
-
- modification.ready();
-
- final DataTreeCandidate candidate;
-
- dataTree.validate(modification);
-
- // strip nodes we dont need since this listener doesn't have to be registered at the root of the DataTree
- candidate = dataTree.prepare(modification);
- dataTree.commit(candidate);
-
-
- DataTreeCandidateNode modifiedChild = candidate.getRootNode();
-
- for (final PathArgument pathArgument : listenerPath.getPathArguments()) {
- modifiedChild = modifiedChild.getModifiedChild(pathArgument).orElse(null);
- }
-
-
- if (modifiedChild == null) {
- modifiedChild = DataTreeCandidateNodes.empty(dataTree.getRootPath().getLastPathArgument());
- }
-
- return DataTreeCandidates.newDataTreeCandidate(dataTree.getRootPath(), modifiedChild);
- }
-
-
- private final class DOMDataTreeListenerWithSubshards implements DOMDataTreeChangeListener {
-
- private final YangInstanceIdentifier listenerPath;
- private final DOMDataTreeChangeListener delegate;
- private final Map<YangInstanceIdentifier, ListenerRegistration<DOMDataTreeChangeListener>> registrations =
- new ConcurrentHashMap<>();
-
- @GuardedBy("this")
- private final Collection<DataTreeCandidate> stashedDataTreeCandidates = new LinkedList<>();
-
- DOMDataTreeListenerWithSubshards(final YangInstanceIdentifier listenerPath,
- final DOMDataTreeChangeListener delegate) {
- this.listenerPath = requireNonNull(listenerPath);
- this.delegate = requireNonNull(delegate);
- }
-
- @Override
- public synchronized void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
- LOG.debug("Received data changed {}", changes);
-
- if (!stashedDataTreeCandidates.isEmpty()) {
- LOG.debug("Adding stashed subshards' changes {}", stashedDataTreeCandidates);
- changes.addAll(stashedDataTreeCandidates);
- stashedDataTreeCandidates.clear();
- }
-
- try {
- applyChanges(listenerPath, changes);
- } catch (final DataValidationFailedException e) {
- // TODO should we fail here? What if stashed changes
- // (changes from subshards) got ahead more than one generation
- // from current shard. Than we can fail to apply this changes
- // upon current data tree, but once we get respective changes
- // from current shard, we can apply also changes from
- // subshards.
- //
- // However, we can loose ability to notice and report some
- // errors then. For example, we cannot detect potential lost
- // changes from current shard.
- LOG.error("Validation failed for modification built from changes {}, current data tree: {}",
- changes, dataTree, e);
- throw new RuntimeException("Notification validation failed", e);
- }
-
- delegate.onDataTreeChanged(changes);
- }
-
- synchronized void onDataTreeChanged(final YangInstanceIdentifier pathFromRoot,
- final Collection<DataTreeCandidate> changes) {
- final YangInstanceIdentifier changeId =
- YangInstanceIdentifier.create(stripShardPath(dataTree.getRootPath(), pathFromRoot));
-
- final List<DataTreeCandidate> newCandidates = changes.stream()
- .map(candidate -> DataTreeCandidates.newDataTreeCandidate(changeId, candidate.getRootNode()))
- .collect(Collectors.toList());
-
- try {
- delegate.onDataTreeChanged(Collections.singleton(applyChanges(listenerPath, newCandidates)));
- } catch (final DataValidationFailedException e) {
- // We cannot apply changes from subshard to current data tree.
- // Maybe changes from current shard haven't been applied to
- // data tree yet. Postpone processing of these changes till we
- // receive changes from current shard.
- LOG.debug("Validation for modification built from subshard {} changes {} failed, current data tree {}.",
- pathFromRoot, changes, dataTree, e);
- stashedDataTreeCandidates.addAll(newCandidates);
- }
- }
-
- void addSubshard(final ChildShardContext context) {
- checkState(context.getShard() instanceof DOMStoreTreeChangePublisher,
- "All subshards that are initialDataChangeEvent part of ListenerContext need to be listenable");
-
- final DOMStoreTreeChangePublisher listenableShard = (DOMStoreTreeChangePublisher) context.getShard();
- // since this is going into subshard we want to listen for ALL changes in the subshard
- registrations.put(context.getPrefix().getRootIdentifier(),
- listenableShard.registerTreeChangeListener(
- context.getPrefix().getRootIdentifier(), changes -> onDataTreeChanged(
- context.getPrefix().getRootIdentifier(), changes)));
- }
-
- void close() {
- for (final ListenerRegistration<DOMDataTreeChangeListener> registration : registrations.values()) {
- registration.close();
- }
- registrations.clear();
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import com.google.common.annotations.Beta;
-import java.util.Collection;
-import java.util.concurrent.CompletionStage;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
-
-/**
- * A factory that handles addition of new clustered shard's based on a prefix. This factory is a QoL class that handles
- * all the boilerplate that comes with registration of a new clustered shard into the system and creating the backend
- * shard/replicas that come along with it.
- */
-@Beta
-@Deprecated(forRemoval = true)
-public interface DistributedShardFactory {
- /**
- * Register a new shard that is rooted at the desired prefix with replicas on the provided members.
- * Note to register a shard without replicas you still need to provide at least one Member for the shard.
- *
- * @param prefix Shard root
- * @param replicaMembers Members that this shard is replicated on, has to have at least one Member even if the shard
- * should not be replicated.
- * @return A future that will be completed with a DistributedShardRegistration once the backend and frontend shards
- * are spawned.
- * @throws DOMDataTreeShardingConflictException If the initial check for a conflict on the local node fails, the
- * sharding configuration won't be updated if this exception is thrown.
- */
- CompletionStage<DistributedShardRegistration>
- createDistributedShard(DOMDataTreeIdentifier prefix, Collection<MemberName> replicaMembers)
- throws DOMDataTreeShardingConflictException;
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
-import org.opendaylight.mdsal.dom.spi.shard.ChildShardContext;
-import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
-import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
-import org.opendaylight.mdsal.dom.spi.shard.ReadableWriteableDOMDataTreeShard;
-import org.opendaylight.mdsal.dom.spi.shard.SubshardProducerSpecification;
-import org.opendaylight.mdsal.dom.spi.shard.WriteableDOMDataTreeShard;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Proxy implementation of a shard that creates forwarding producers to the backend shard.
- */
-@Deprecated(forRemoval = true)
-class DistributedShardFrontend implements ReadableWriteableDOMDataTreeShard {
-
- private static final Logger LOG = LoggerFactory.getLogger(DistributedShardFrontend.class);
-
- private final DataStoreClient client;
- private final DOMDataTreeIdentifier shardRoot;
- @GuardedBy("this")
- private final Map<DOMDataTreeIdentifier, ChildShardContext> childShards = new HashMap<>();
- @GuardedBy("this")
- private final List<ShardProxyProducer> producers = new ArrayList<>();
-
- private final DistributedShardChangePublisher publisher;
-
- DistributedShardFrontend(final DistributedDataStoreInterface distributedDataStore,
- final DataStoreClient client,
- final DOMDataTreeIdentifier shardRoot) {
- this.client = requireNonNull(client);
- this.shardRoot = requireNonNull(shardRoot);
-
- publisher = new DistributedShardChangePublisher(client, requireNonNull(distributedDataStore), shardRoot,
- childShards);
- }
-
- @Override
- public synchronized DOMDataTreeShardProducer createProducer(final Collection<DOMDataTreeIdentifier> paths) {
- for (final DOMDataTreeIdentifier prodPrefix : paths) {
- checkArgument(shardRoot.contains(prodPrefix), "Prefix %s is not contained under shard root", prodPrefix,
- paths);
- }
-
- final ShardProxyProducer ret =
- new ShardProxyProducer(shardRoot, paths, client, createModificationFactory(paths));
- producers.add(ret);
- return ret;
- }
-
- @Override
- public synchronized void onChildAttached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
- LOG.debug("{} : Child shard attached at {}", shardRoot, prefix);
- checkArgument(child != this, "Attempted to attach child %s onto self", this);
- addChildShard(prefix, child);
- updateProducers();
- }
-
- @Override
- public synchronized void onChildDetached(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
- LOG.debug("{} : Child shard detached at {}", shardRoot, prefix);
- childShards.remove(prefix);
- updateProducers();
- // TODO we should grab the dataTreeSnapshot that's in the shard and apply it to this shard
- }
-
- private void addChildShard(final DOMDataTreeIdentifier prefix, final DOMDataTreeShard child) {
- checkArgument(child instanceof WriteableDOMDataTreeShard);
- childShards.put(prefix, new ChildShardContext(prefix, (WriteableDOMDataTreeShard) child));
- }
-
- DistributedShardModificationFactory createModificationFactory(final Collection<DOMDataTreeIdentifier> prefixes) {
- // TODO this could be abstract
- final Map<DOMDataTreeIdentifier, SubshardProducerSpecification> affectedSubshards = new HashMap<>();
-
- for (final DOMDataTreeIdentifier producerPrefix : prefixes) {
- for (final ChildShardContext maybeAffected : childShards.values()) {
- final DOMDataTreeIdentifier bindPath;
- if (producerPrefix.contains(maybeAffected.getPrefix())) {
- bindPath = maybeAffected.getPrefix();
- } else if (maybeAffected.getPrefix().contains(producerPrefix)) {
- // Bound path is inside subshard
- bindPath = producerPrefix;
- } else {
- continue;
- }
-
- SubshardProducerSpecification spec = affectedSubshards.computeIfAbsent(maybeAffected.getPrefix(),
- k -> new SubshardProducerSpecification(maybeAffected));
- spec.addPrefix(bindPath);
- }
- }
-
- final DistributedShardModificationFactoryBuilder builder =
- new DistributedShardModificationFactoryBuilder(shardRoot);
- for (final SubshardProducerSpecification spec : affectedSubshards.values()) {
- final ForeignShardModificationContext foreignContext =
- new ForeignShardModificationContext(spec.getPrefix(), spec.createProducer());
- builder.addSubshard(foreignContext);
- builder.addSubshard(spec.getPrefix(), foreignContext);
- }
-
- return builder.build();
- }
-
- private void updateProducers() {
- for (final ShardProxyProducer producer : producers) {
- producer.setModificationFactory(createModificationFactory(producer.getPrefixes()));
- }
- }
-
- @Override
- public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
- final YangInstanceIdentifier treeId, final L listener) {
- return publisher.registerTreeChangeListener(treeId, listener);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.Map;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
-import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
-import org.opendaylight.mdsal.dom.spi.shard.WritableNodeOperation;
-import org.opendaylight.mdsal.dom.spi.shard.WriteCursorStrategy;
-import org.opendaylight.mdsal.dom.spi.shard.WriteableModificationNode;
-import org.opendaylight.mdsal.dom.spi.shard.WriteableNodeWithSubshard;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-
-/**
- * Shard modification that consists of the whole shard context, provides cursors which correctly delegate to subshards
- * if any are present.
- */
-@Deprecated(forRemoval = true)
-public class DistributedShardModification extends WriteableNodeWithSubshard {
-
- private final DistributedShardModificationContext context;
- private final Map<DOMDataTreeIdentifier, ForeignShardModificationContext> childShards;
-
- public DistributedShardModification(final DistributedShardModificationContext context,
- final Map<PathArgument, WriteableModificationNode> subshards,
- final Map<DOMDataTreeIdentifier, ForeignShardModificationContext> childShards) {
- super(subshards);
- this.context = requireNonNull(context);
- this.childShards = requireNonNull(childShards);
- }
-
- @Override
- public PathArgument getIdentifier() {
- return context.getIdentifier().getRootIdentifier().getLastPathArgument();
- }
-
- @Override
- public WriteCursorStrategy createOperation(final DOMDataTreeWriteCursor parentCursor) {
- return new WritableNodeOperation(this, context.cursor()) {
- @Override
- public void exit() {
- throw new IllegalStateException("Can not exit data tree root");
- }
- };
- }
-
- void cursorClosed() {
- context.closeCursor();
- }
-
- DOMStoreThreePhaseCommitCohort seal() {
- childShards.values().stream().filter(ForeignShardModificationContext::isModified)
- .forEach(ForeignShardModificationContext::ready);
-
- return context.ready();
- }
-
- DOMDataTreeIdentifier getPrefix() {
- return context.getIdentifier();
- }
-
- Map<DOMDataTreeIdentifier, ForeignShardModificationContext> getChildShards() {
- return childShards;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.sharding;
-
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-
-/**
- * The context for a single shards modification, keeps a ClientTransaction so it can route requests correctly.
- */
-@Deprecated(forRemoval = true)
-public class DistributedShardModificationContext {
-
- private final ClientTransaction transaction;
- private final DOMDataTreeIdentifier identifier;
- private DOMDataTreeWriteCursor cursor;
-
- public DistributedShardModificationContext(final ClientTransaction transaction,
- final DOMDataTreeIdentifier identifier) {
- this.transaction = transaction;
- this.identifier = identifier;
- }
-
- public DOMDataTreeIdentifier getIdentifier() {
- return identifier;
- }
-
- DOMDataTreeWriteCursor cursor() {
- if (cursor == null) {
- cursor = transaction.openCursor();
- }
-
- return cursor;
- }
-
- DOMStoreThreePhaseCommitCohort ready() {
- if (cursor != null) {
- cursor.close();
- cursor = null;
- }
-
- return transaction.ready();
- }
-
- void closeCursor() {
- if (cursor != null) {
- cursor.close();
- cursor = null;
- }
- }
-
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import org.opendaylight.mdsal.dom.spi.shard.AbstractDataModificationCursor;
-import org.opendaylight.mdsal.dom.spi.shard.WriteCursorStrategy;
-
-/**
- * Internal cursor implementation consisting of WriteCursorStrategies which forwards writes to foreign modifications
- * if any.
- */
-@Deprecated(forRemoval = true)
-public class DistributedShardModificationCursor extends AbstractDataModificationCursor<DistributedShardModification> {
-
- private final ShardProxyTransaction parent;
-
- public DistributedShardModificationCursor(final DistributedShardModification root,
- final ShardProxyTransaction parent) {
- super(root);
- this.parent = parent;
- }
-
- @Override
- protected WriteCursorStrategy getRootOperation(final DistributedShardModification root) {
- return root.createOperation(null);
- }
-
- @Override
- public void close() {
- parent.cursorClosed();
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import java.util.Map;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
-import org.opendaylight.mdsal.dom.spi.shard.WriteableModificationNode;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-
-/**
- * Factory for {@link DistributedShardModification}.
- */
-@Deprecated(forRemoval = true)
-public final class DistributedShardModificationFactory {
- private final Map<DOMDataTreeIdentifier, ForeignShardModificationContext> childShards;
- private final Map<PathArgument, WriteableModificationNode> children;
- private final DOMDataTreeIdentifier root;
-
- DistributedShardModificationFactory(final DOMDataTreeIdentifier root,
- final Map<PathArgument, WriteableModificationNode> children,
- final Map<DOMDataTreeIdentifier, ForeignShardModificationContext> childShards) {
- this.root = requireNonNull(root);
- this.children = ImmutableMap.copyOf(children);
- this.childShards = ImmutableMap.copyOf(childShards);
- }
-
- @VisibleForTesting
- Map<PathArgument, WriteableModificationNode> getChildren() {
- return children;
- }
-
- @VisibleForTesting
- Map<DOMDataTreeIdentifier, ForeignShardModificationContext> getChildShards() {
- return childShards;
- }
-
- DistributedShardModification createModification(final ClientTransaction transaction) {
- return new DistributedShardModification(
- new DistributedShardModificationContext(transaction, root), children, childShards);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.spi.shard.AbstractShardModificationFactoryBuilder;
-
-/**
- * Builder for {@link DistributedShardModificationFactory}.
- */
-@Deprecated(forRemoval = true)
-public class DistributedShardModificationFactoryBuilder
- extends AbstractShardModificationFactoryBuilder<DistributedShardModificationFactory> {
-
-
- public DistributedShardModificationFactoryBuilder(final DOMDataTreeIdentifier root) {
- super(root);
- }
-
- @Override
- public DistributedShardModificationFactory build() {
- return new DistributedShardModificationFactory(root, buildChildren(), childShards);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import com.google.common.annotations.Beta;
-import java.util.concurrent.CompletionStage;
-
-/**
- * Registration of the CDS shard that allows you to remove the shard from the system by closing the registration.
- * This removal is done asynchronously.
- */
-@Beta
-@Deprecated(forRemoval = true)
-public interface DistributedShardRegistration {
-
- /**
- * Removes the shard from the system, this removal is done asynchronously, the future completes once the
- * backend shard is no longer present.
- */
- CompletionStage<Void> close();
-}
+++ /dev/null
-/*
- * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static akka.actor.ActorRef.noSender;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ClassToInstanceMap;
-import com.google.common.collect.ForwardingObject;
-import com.google.common.collect.ImmutableClassToInstanceMap;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.AbstractMap.SimpleEntry;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.checkerframework.checker.lock.qual.GuardedBy;
-import org.opendaylight.controller.cluster.ActorSystemProvider;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
-import org.opendaylight.controller.cluster.datastore.Shard;
-import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
-import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer;
-import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
-import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor.ShardedDataTreeActorCreator;
-import org.opendaylight.controller.cluster.sharding.messages.InitConfigListener;
-import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
-import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
-import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
-import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeServiceExtension;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
-import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
-import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
-import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTable;
-import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.prefix.shard.configuration.rev170110.PrefixShards;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.compat.java8.FutureConverters;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * A layer on top of DOMDataTreeService that distributes producer/shard registrations to remote nodes via
- * {@link ShardedDataTreeActor}. Also provides QoL method for addition of prefix based clustered shard into the system.
- */
-@Deprecated(forRemoval = true)
-public class DistributedShardedDOMDataTree implements DOMDataTreeService, DOMDataTreeShardingService,
- DistributedShardFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTree.class);
-
- private static final int MAX_ACTOR_CREATION_RETRIES = 100;
- private static final int ACTOR_RETRY_DELAY = 100;
- private static final TimeUnit ACTOR_RETRY_TIME_UNIT = TimeUnit.MILLISECONDS;
- private static final int LOOKUP_TASK_MAX_RETRIES = 100;
- static final FiniteDuration SHARD_FUTURE_TIMEOUT_DURATION =
- new FiniteDuration(LOOKUP_TASK_MAX_RETRIES * LOOKUP_TASK_MAX_RETRIES * 3, TimeUnit.SECONDS);
- static final Timeout SHARD_FUTURE_TIMEOUT = new Timeout(SHARD_FUTURE_TIMEOUT_DURATION);
-
- static final String ACTOR_ID = "ShardedDOMDataTreeFrontend";
-
- private final ShardedDOMDataTree shardedDOMDataTree;
- private final ActorSystem actorSystem;
- private final DistributedDataStoreInterface distributedOperDatastore;
- private final DistributedDataStoreInterface distributedConfigDatastore;
-
- private final ActorRef shardedDataTreeActor;
- private final MemberName memberName;
-
- @GuardedBy("shards")
- private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shards =
- DOMDataTreePrefixTable.create();
-
- private final EnumMap<LogicalDatastoreType, Entry<DataStoreClient, ActorRef>> configurationShardMap =
- new EnumMap<>(LogicalDatastoreType.class);
-
- private final EnumMap<LogicalDatastoreType, PrefixedShardConfigWriter> writerMap =
- new EnumMap<>(LogicalDatastoreType.class);
-
- private final PrefixedShardConfigUpdateHandler updateHandler;
-
- public DistributedShardedDOMDataTree(final ActorSystemProvider actorSystemProvider,
- final DistributedDataStoreInterface distributedOperDatastore,
- final DistributedDataStoreInterface distributedConfigDatastore) {
- this.actorSystem = requireNonNull(actorSystemProvider).getActorSystem();
- this.distributedOperDatastore = requireNonNull(distributedOperDatastore);
- this.distributedConfigDatastore = requireNonNull(distributedConfigDatastore);
- shardedDOMDataTree = new ShardedDOMDataTree();
-
- shardedDataTreeActor = createShardedDataTreeActor(actorSystem,
- new ShardedDataTreeActorCreator()
- .setShardingService(this)
- .setActorSystem(actorSystem)
- .setClusterWrapper(distributedConfigDatastore.getActorUtils().getClusterWrapper())
- .setDistributedConfigDatastore(distributedConfigDatastore)
- .setDistributedOperDatastore(distributedOperDatastore)
- .setLookupTaskMaxRetries(LOOKUP_TASK_MAX_RETRIES),
- ACTOR_ID);
-
- this.memberName = distributedConfigDatastore.getActorUtils().getCurrentMemberName();
-
- updateHandler = new PrefixedShardConfigUpdateHandler(shardedDataTreeActor,
- distributedConfigDatastore.getActorUtils().getCurrentMemberName());
-
- LOG.debug("{} - Starting prefix configuration shards", memberName);
- createPrefixConfigShard(distributedConfigDatastore);
- createPrefixConfigShard(distributedOperDatastore);
- }
-
- private static void createPrefixConfigShard(final DistributedDataStoreInterface dataStore) {
- Configuration configuration = dataStore.getActorUtils().getConfiguration();
- Collection<MemberName> memberNames = configuration.getUniqueMemberNamesForAllShards();
- CreateShard createShardMessage =
- new CreateShard(new ModuleShardConfiguration(PrefixShards.QNAME.getNamespace(),
- "prefix-shard-configuration", ClusterUtils.PREFIX_CONFIG_SHARD_ID, ModuleShardStrategy.NAME,
- memberNames),
- Shard.builder(), dataStore.getActorUtils().getDatastoreContext());
-
- dataStore.getActorUtils().getShardManager().tell(createShardMessage, noSender());
- }
-
- /**
- * This will try to initialize prefix configuration shards upon their
- * successful start. We need to create writers to these shards, so we can
- * satisfy future {@link #createDistributedShard} and
- * {@link #resolveShardAdditions} requests and update prefix configuration
- * shards accordingly.
- *
- * <p>
- * We also need to initialize listeners on these shards, so we can react
- * on changes made on them by other cluster members or even by ourselves.
- *
- * <p>
- * Finally, we need to be sure that default shards for both operational and
- * configuration data stores are up and running and we have distributed
- * shards frontend created for them.
- *
- * <p>
- * This is intended to be invoked by blueprint as initialization method.
- */
- public void init() {
- // create our writers to the configuration
- try {
- LOG.debug("{} - starting config shard lookup.", memberName);
-
- // We have to wait for prefix config shards to be up and running
- // so we can create datastore clients for them
- handleConfigShardLookup().get(SHARD_FUTURE_TIMEOUT_DURATION.length(), SHARD_FUTURE_TIMEOUT_DURATION.unit());
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- throw new IllegalStateException("Prefix config shards not found", e);
- }
-
- try {
- LOG.debug("{}: Prefix configuration shards ready - creating clients", memberName);
- configurationShardMap.put(LogicalDatastoreType.CONFIGURATION,
- createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID,
- distributedConfigDatastore.getActorUtils()));
- } catch (final DOMDataTreeShardCreationFailedException e) {
- throw new IllegalStateException(
- "Unable to create datastoreClient for config DS prefix configuration shard.", e);
- }
-
- try {
- configurationShardMap.put(LogicalDatastoreType.OPERATIONAL,
- createDatastoreClient(ClusterUtils.PREFIX_CONFIG_SHARD_ID,
- distributedOperDatastore.getActorUtils()));
-
- } catch (final DOMDataTreeShardCreationFailedException e) {
- throw new IllegalStateException(
- "Unable to create datastoreClient for oper DS prefix configuration shard.", e);
- }
-
- writerMap.put(LogicalDatastoreType.CONFIGURATION, new PrefixedShardConfigWriter(
- configurationShardMap.get(LogicalDatastoreType.CONFIGURATION).getKey()));
-
- writerMap.put(LogicalDatastoreType.OPERATIONAL, new PrefixedShardConfigWriter(
- configurationShardMap.get(LogicalDatastoreType.OPERATIONAL).getKey()));
-
- updateHandler.initListener(distributedConfigDatastore, LogicalDatastoreType.CONFIGURATION);
- updateHandler.initListener(distributedOperDatastore, LogicalDatastoreType.OPERATIONAL);
-
- distributedConfigDatastore.getActorUtils().getShardManager().tell(InitConfigListener.INSTANCE, noSender());
- distributedOperDatastore.getActorUtils().getShardManager().tell(InitConfigListener.INSTANCE, noSender());
-
-
- //create shard registration for DEFAULT_SHARD
- initDefaultShard(LogicalDatastoreType.CONFIGURATION);
- initDefaultShard(LogicalDatastoreType.OPERATIONAL);
- }
-
- private ListenableFuture<List<Void>> handleConfigShardLookup() {
-
- final ListenableFuture<Void> configFuture = lookupConfigShard(LogicalDatastoreType.CONFIGURATION);
- final ListenableFuture<Void> operFuture = lookupConfigShard(LogicalDatastoreType.OPERATIONAL);
-
- return Futures.allAsList(configFuture, operFuture);
- }
-
- private ListenableFuture<Void> lookupConfigShard(final LogicalDatastoreType type) {
- final SettableFuture<Void> future = SettableFuture.create();
-
- final Future<Object> ask =
- Patterns.ask(shardedDataTreeActor, new StartConfigShardLookup(type), SHARD_FUTURE_TIMEOUT);
-
- ask.onComplete(new OnComplete<>() {
- @Override
- public void onComplete(final Throwable throwable, final Object result) {
- if (throwable != null) {
- future.setException(throwable);
- } else {
- future.set(null);
- }
- }
- }, actorSystem.dispatcher());
-
- return future;
- }
-
- @Override
- public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(
- final T listener, final Collection<DOMDataTreeIdentifier> subtrees,
- final boolean allowRxMerges, final Collection<DOMDataTreeProducer> producers)
- throws DOMDataTreeLoopException {
- return shardedDOMDataTree.registerListener(listener, subtrees, allowRxMerges, producers);
- }
-
- @Override
- public ClassToInstanceMap<DOMDataTreeServiceExtension> getExtensions() {
- return ImmutableClassToInstanceMap.of();
- }
-
- @Override
- public DOMDataTreeProducer createProducer(final Collection<DOMDataTreeIdentifier> subtrees) {
- LOG.debug("{} - Creating producer for {}", memberName, subtrees);
- final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(subtrees);
-
- final Object response = distributedConfigDatastore.getActorUtils()
- .executeOperation(shardedDataTreeActor, new ProducerCreated(subtrees));
- if (response == null) {
- LOG.debug("{} - Received success from remote nodes, creating producer:{}", memberName, subtrees);
- return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
- distributedConfigDatastore.getActorUtils(), shards);
- }
-
- closeProducer(producer);
-
- if (response instanceof Throwable) {
- Throwables.throwIfUnchecked((Throwable) response);
- throw new RuntimeException((Throwable) response);
- }
- throw new RuntimeException("Unexpected response to create producer received." + response);
- }
-
- @Override
- public CompletionStage<DistributedShardRegistration> createDistributedShard(
- final DOMDataTreeIdentifier prefix, final Collection<MemberName> replicaMembers)
- throws DOMDataTreeShardingConflictException {
-
- synchronized (shards) {
- final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
- shards.lookup(prefix);
- if (lookup != null && lookup.getValue().getPrefix().equals(prefix)) {
- throw new DOMDataTreeShardingConflictException(
- "Prefix " + prefix + " is already occupied by another shard.");
- }
- }
-
- final PrefixedShardConfigWriter writer = writerMap.get(prefix.getDatastoreType());
-
- final ListenableFuture<Void> writeFuture =
- writer.writeConfig(prefix.getRootIdentifier(), replicaMembers);
-
- final Promise<DistributedShardRegistration> shardRegistrationPromise = akka.dispatch.Futures.promise();
- Futures.addCallback(writeFuture, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
-
- final Future<Object> ask =
- Patterns.ask(shardedDataTreeActor, new LookupPrefixShard(prefix), SHARD_FUTURE_TIMEOUT);
-
- shardRegistrationPromise.completeWith(ask.transform(
- new Mapper<Object, DistributedShardRegistration>() {
- @Override
- public DistributedShardRegistration apply(final Object parameter) {
- return new DistributedShardRegistrationImpl(
- prefix, shardedDataTreeActor, DistributedShardedDOMDataTree.this);
- }
- },
- new Mapper<Throwable, Throwable>() {
- @Override
- public Throwable apply(final Throwable throwable) {
- return new DOMDataTreeShardCreationFailedException(
- "Unable to create a cds shard.", throwable);
- }
- }, actorSystem.dispatcher()));
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- shardRegistrationPromise.failure(
- new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable));
- }
- }, MoreExecutors.directExecutor());
-
- return FutureConverters.toJava(shardRegistrationPromise.future());
- }
-
- void resolveShardAdditions(final Set<DOMDataTreeIdentifier> additions) {
- LOG.debug("{}: Resolving additions : {}", memberName, additions);
- // we need to register the shards from top to bottom, so we need to atleast make sure the ordering reflects that
- additions
- .stream()
- .sorted(Comparator.comparingInt(o -> o.getRootIdentifier().getPathArguments().size()))
- .forEachOrdered(this::createShardFrontend);
- }
-
- void resolveShardRemovals(final Set<DOMDataTreeIdentifier> removals) {
- LOG.debug("{}: Resolving removals : {}", memberName, removals);
-
- // do we need to go from bottom to top?
- removals.forEach(this::despawnShardFrontend);
- }
-
- private void createShardFrontend(final DOMDataTreeIdentifier prefix) {
- LOG.debug("{}: Creating CDS shard for prefix: {}", memberName, prefix);
- final String shardName = ClusterUtils.getCleanShardName(prefix.getRootIdentifier());
- final DistributedDataStoreInterface distributedDataStore =
- prefix.getDatastoreType().equals(LogicalDatastoreType.CONFIGURATION)
- ? distributedConfigDatastore : distributedOperDatastore;
-
- try (DOMDataTreeProducer producer = localCreateProducer(Collections.singletonList(prefix))) {
- final Entry<DataStoreClient, ActorRef> entry =
- createDatastoreClient(shardName, distributedDataStore.getActorUtils());
-
- final DistributedShardFrontend shard =
- new DistributedShardFrontend(distributedDataStore, entry.getKey(), prefix);
-
- final DOMDataTreeShardRegistration<DOMDataTreeShard> reg =
- shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
-
- synchronized (shards) {
- shards.store(prefix, reg);
- }
-
- } catch (final DOMDataTreeShardingConflictException e) {
- LOG.error("{}: Prefix {} is already occupied by another shard",
- distributedConfigDatastore.getActorUtils().getClusterWrapper().getCurrentMemberName(), prefix, e);
- } catch (DOMDataTreeProducerException e) {
- LOG.error("Unable to close producer", e);
- } catch (DOMDataTreeShardCreationFailedException e) {
- LOG.error("Unable to create datastore client for shard {}", prefix, e);
- }
- }
-
- private void despawnShardFrontend(final DOMDataTreeIdentifier prefix) {
- LOG.debug("{}: Removing CDS shard for prefix: {}", memberName, prefix);
- final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup;
- synchronized (shards) {
- lookup = shards.lookup(prefix);
- }
-
- if (lookup == null || !lookup.getValue().getPrefix().equals(prefix)) {
- LOG.debug("{}: Received despawn for non-existing CDS shard frontend, prefix: {}, ignoring..",
- memberName, prefix);
- return;
- }
-
- lookup.getValue().close();
- // need to remove from our local table thats used for tracking
- synchronized (shards) {
- shards.remove(prefix);
- }
-
- final PrefixedShardConfigWriter writer = writerMap.get(prefix.getDatastoreType());
- final ListenableFuture<Void> future = writer.removeConfig(prefix.getRootIdentifier());
-
- Futures.addCallback(future, new FutureCallback<Void>() {
- @Override
- public void onSuccess(final Void result) {
- LOG.debug("{} - Succesfuly removed shard for {}", memberName, prefix);
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.error("Removal of shard {} from configuration failed.", prefix, throwable);
- }
- }, MoreExecutors.directExecutor());
- }
-
- DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
- final DOMDataTreeIdentifier prefix) {
- synchronized (shards) {
- return shards.lookup(prefix);
- }
- }
-
- DOMDataTreeProducer localCreateProducer(final Collection<DOMDataTreeIdentifier> prefix) {
- return shardedDOMDataTree.createProducer(prefix);
- }
-
- @Override
- public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
- final DOMDataTreeIdentifier prefix, final T shard, final DOMDataTreeProducer producer)
- throws DOMDataTreeShardingConflictException {
-
- LOG.debug("Registering shard[{}] at prefix: {}", shard, prefix);
-
- if (producer instanceof ProxyProducer) {
- return shardedDOMDataTree.registerDataTreeShard(prefix, shard, ((ProxyProducer) producer).delegate());
- }
-
- return shardedDOMDataTree.registerDataTreeShard(prefix, shard, producer);
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private Entry<DataStoreClient, ActorRef> createDatastoreClient(final String shardName, final ActorUtils actorUtils)
- throws DOMDataTreeShardCreationFailedException {
-
- LOG.debug("{}: Creating distributed datastore client for shard {}", memberName, shardName);
- final Props distributedDataStoreClientProps =
- SimpleDataStoreClientActor.props(memberName, "Shard-" + shardName, actorUtils, shardName);
-
- final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
- try {
- return new SimpleEntry<>(SimpleDataStoreClientActor
- .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS), clientActor);
- } catch (final Exception e) {
- LOG.error("{}: Failed to get actor for {}", distributedDataStoreClientProps, memberName, e);
- clientActor.tell(PoisonPill.getInstance(), noSender());
- throw new DOMDataTreeShardCreationFailedException(
- "Unable to create datastore client for shard{" + shardName + "}", e);
- }
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void initDefaultShard(final LogicalDatastoreType logicalDatastoreType) {
-
- final PrefixedShardConfigWriter writer = writerMap.get(logicalDatastoreType);
-
- if (writer.checkDefaultIsPresent()) {
- LOG.debug("{}: Default shard for {} is already present in the config. Possibly saved in snapshot.",
- memberName, logicalDatastoreType);
- } else {
- try {
- // Currently the default shard configuration is present in the out-of-box modules.conf and is
- // expected to be present. So look up the local default shard here and create the frontend.
-
- // TODO we don't have to do it for config and operational default shard separately. Just one of them
- // should be enough
- final ActorUtils actorUtils = logicalDatastoreType == LogicalDatastoreType.CONFIGURATION
- ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
-
- final Optional<ActorRef> defaultLocalShardOptional =
- actorUtils.findLocalShard(ClusterUtils.getCleanShardName(YangInstanceIdentifier.empty()));
-
- if (defaultLocalShardOptional.isPresent()) {
- LOG.debug("{}: Default shard for {} is already started, creating just frontend", memberName,
- logicalDatastoreType);
- createShardFrontend(new DOMDataTreeIdentifier(logicalDatastoreType,
- YangInstanceIdentifier.empty()));
- }
-
- // The local shard isn't present - we assume that means the local member isn't in the replica list
- // and will be dynamically created later via an explicit add-shard-replica request. This is the
- // bootstrapping mechanism to add a new node into an existing cluster. The following code to create
- // the default shard as a prefix shard is problematic in this scenario so it is commented out. Since
- // the default shard is a module-based shard by default, it makes sense to always treat it as such,
- // ie bootstrap it in the same manner as the special prefix-configuration and EOS shards.
-// final Collection<MemberName> names = distributedConfigDatastore.getActorUtils().getConfiguration()
-// .getUniqueMemberNamesForAllShards();
-// Await.result(FutureConverters.toScala(createDistributedShard(
-// new DOMDataTreeIdentifier(logicalDatastoreType, YangInstanceIdentifier.empty()), names)),
-// SHARD_FUTURE_TIMEOUT_DURATION);
-// } catch (DOMDataTreeShardingConflictException e) {
-// LOG.debug("{}: Default shard for {} already registered, possibly due to other node doing it faster",
-// memberName, logicalDatastoreType);
- } catch (Exception e) {
- LOG.error("{}: Default shard initialization for {} failed", memberName, logicalDatastoreType, e);
- throw new RuntimeException(e);
- }
- }
- }
-
- private static void closeProducer(final DOMDataTreeProducer producer) {
- try {
- producer.close();
- } catch (final DOMDataTreeProducerException e) {
- LOG.error("Unable to close producer", e);
- }
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private static ActorRef createShardedDataTreeActor(final ActorSystem actorSystem,
- final ShardedDataTreeActorCreator creator,
- final String shardDataTreeActorId) {
- Exception lastException = null;
-
- for (int i = 0; i < MAX_ACTOR_CREATION_RETRIES; i++) {
- try {
- return actorSystem.actorOf(creator.props(), shardDataTreeActorId);
- } catch (final Exception e) {
- lastException = e;
- Uninterruptibles.sleepUninterruptibly(ACTOR_RETRY_DELAY, ACTOR_RETRY_TIME_UNIT);
- LOG.debug("Could not create actor {} because of {} -"
- + " waiting for sometime before retrying (retry count = {})",
- shardDataTreeActorId, e.getMessage(), i);
- }
- }
-
- throw new IllegalStateException("Failed to create actor for ShardedDOMDataTree", lastException);
- }
-
- private class DistributedShardRegistrationImpl implements DistributedShardRegistration {
-
- private final DOMDataTreeIdentifier prefix;
- private final ActorRef shardedDataTreeActor;
- private final DistributedShardedDOMDataTree distributedShardedDOMDataTree;
-
- DistributedShardRegistrationImpl(final DOMDataTreeIdentifier prefix,
- final ActorRef shardedDataTreeActor,
- final DistributedShardedDOMDataTree distributedShardedDOMDataTree) {
- this.prefix = prefix;
- this.shardedDataTreeActor = shardedDataTreeActor;
- this.distributedShardedDOMDataTree = distributedShardedDOMDataTree;
- }
-
- @Override
- public CompletionStage<Void> close() {
- // first despawn on the local node
- distributedShardedDOMDataTree.despawnShardFrontend(prefix);
- // update the config so the remote nodes are updated
- final Future<Object> ask =
- Patterns.ask(shardedDataTreeActor, new PrefixShardRemovalLookup(prefix), SHARD_FUTURE_TIMEOUT);
-
- final Future<Void> closeFuture = ask.transform(
- new Mapper<Object, Void>() {
- @Override
- public Void apply(final Object parameter) {
- return null;
- }
- },
- new Mapper<Throwable, Throwable>() {
- @Override
- public Throwable apply(final Throwable throwable) {
- return throwable;
- }
- }, actorSystem.dispatcher());
-
- return FutureConverters.toJava(closeFuture);
- }
- }
-
- // TODO what about producers created by this producer?
- // They should also be CDSProducers
- private static final class ProxyProducer extends ForwardingObject implements CDSDataTreeProducer {
-
- private final DOMDataTreeProducer delegate;
- private final Collection<DOMDataTreeIdentifier> subtrees;
- private final ActorRef shardDataTreeActor;
- private final ActorUtils actorUtils;
- @GuardedBy("shardAccessMap")
- private final Map<DOMDataTreeIdentifier, CDSShardAccessImpl> shardAccessMap = new HashMap<>();
-
- // We don't have to guard access to shardTable in ProxyProducer.
- // ShardTable's entries relevant to this ProxyProducer shouldn't
- // change during producer's lifetime.
- private final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shardTable;
-
- ProxyProducer(final DOMDataTreeProducer delegate,
- final Collection<DOMDataTreeIdentifier> subtrees,
- final ActorRef shardDataTreeActor,
- final ActorUtils actorUtils,
- final DOMDataTreePrefixTable<DOMDataTreeShardRegistration<DOMDataTreeShard>> shardLayout) {
- this.delegate = requireNonNull(delegate);
- this.subtrees = requireNonNull(subtrees);
- this.shardDataTreeActor = requireNonNull(shardDataTreeActor);
- this.actorUtils = requireNonNull(actorUtils);
- this.shardTable = requireNonNull(shardLayout);
- }
-
- @Override
- public DOMDataTreeCursorAwareTransaction createTransaction(final boolean isolated) {
- return delegate.createTransaction(isolated);
- }
-
- @Override
- @SuppressWarnings("checkstyle:hiddenField")
- public DOMDataTreeProducer createProducer(final Collection<DOMDataTreeIdentifier> subtrees) {
- // TODO we probably don't need to distribute this on the remote nodes since once we have this producer
- // open we surely have the rights to all the subtrees.
- return delegate.createProducer(subtrees);
- }
-
- @Override
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void close() throws DOMDataTreeProducerException {
- delegate.close();
-
- synchronized (shardAccessMap) {
- shardAccessMap.values().forEach(CDSShardAccessImpl::close);
- }
-
- final Object o = actorUtils.executeOperation(shardDataTreeActor, new ProducerRemoved(subtrees));
- if (o instanceof DOMDataTreeProducerException) {
- throw (DOMDataTreeProducerException) o;
- } else if (o instanceof Throwable) {
- throw new DOMDataTreeProducerException("Unable to close producer", (Throwable) o);
- }
- }
-
- @Override
- protected DOMDataTreeProducer delegate() {
- return delegate;
- }
-
- @Override
- public CDSShardAccess getShardAccess(final DOMDataTreeIdentifier subtree) {
- checkArgument(subtrees.stream().anyMatch(dataTreeIdentifier -> dataTreeIdentifier.contains(subtree)),
- "Subtree %s is not controlled by this producer %s", subtree, this);
-
- final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookup =
- shardTable.lookup(subtree);
- checkState(lookup != null, "Subtree %s is not contained in any registered shard.", subtree);
-
- final DOMDataTreeIdentifier lookupId = lookup.getValue().getPrefix();
-
- synchronized (shardAccessMap) {
- if (shardAccessMap.get(lookupId) != null) {
- return shardAccessMap.get(lookupId);
- }
-
- // TODO Maybe we can have static factory method and return the same instance
- // for same subtrees. But maybe it is not needed since there can be only one
- // producer attached to some subtree at a time. And also how we can close ShardAccess
- // then
- final CDSShardAccessImpl shardAccess = new CDSShardAccessImpl(lookupId, actorUtils);
- shardAccessMap.put(lookupId, shardAccess);
- return shardAccess;
- }
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static akka.actor.ActorRef.noSender;
-
-import akka.actor.ActorRef;
-import akka.actor.Status;
-import org.eclipse.jdt.annotation.Nullable;
-
-/**
- * Base class for lookup tasks. Lookup tasks are supposed to run repeatedly until successful lookup or maximum retries
- * are hit. This class is NOT thread-safe.
- */
-@Deprecated(forRemoval = true)
-abstract class LookupTask implements Runnable {
- private final int maxRetries;
- private final ActorRef replyTo;
- private int retried = 0;
-
- LookupTask(final ActorRef replyTo, final int maxRetries) {
- this.replyTo = replyTo;
- this.maxRetries = maxRetries;
- }
-
- abstract void reschedule(int retries);
-
- void tryReschedule(final @Nullable Throwable throwable) {
- if (retried <= maxRetries) {
- retried++;
- reschedule(retried);
- } else {
- fail(throwable);
- }
- }
-
- void fail(final @Nullable Throwable throwable) {
- if (throwable == null) {
- replyTo.tell(new Status.Failure(
- new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
- + "Failing..")), noSender());
- } else {
- replyTo.tell(new Status.Failure(
- new DOMDataTreeShardCreationFailedException("Unable to find the backend shard."
- + "Failing..", throwable)), noSender());
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import com.google.common.collect.ClassToInstanceMap;
-import java.util.Collection;
-import java.util.concurrent.CompletionStage;
-import org.opendaylight.controller.cluster.ActorSystemProvider;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeServiceExtension;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Component(immediate = true, property = "type=default")
-@Deprecated(forRemoval = true)
-public final class OSGiDistributedShardedDOMDataTree
- implements DOMDataTreeService, DOMDataTreeShardingService, DistributedShardFactory {
- private static final Logger LOG = LoggerFactory.getLogger(OSGiDistributedShardedDOMDataTree.class);
-
- @Reference
- ActorSystemProvider actorSystemProvider = null;
- @Reference(target = "(type=distributed-config)")
- DistributedDataStoreInterface configDatastore = null;
- @Reference(target = "(type=distributed-operational)")
- DistributedDataStoreInterface operDatastore = null;
-
- private DistributedShardedDOMDataTree delegate;
-
- @Override
- public DOMDataTreeProducer createProducer(final Collection<DOMDataTreeIdentifier> subtrees) {
- return delegate.createProducer(subtrees);
- }
-
- @Override
- public ClassToInstanceMap<DOMDataTreeServiceExtension> getExtensions() {
- return delegate.getExtensions();
- }
-
- @Override
- public CompletionStage<DistributedShardRegistration> createDistributedShard(final DOMDataTreeIdentifier prefix,
- final Collection<MemberName> replicaMembers) throws DOMDataTreeShardingConflictException {
- return delegate.createDistributedShard(prefix, replicaMembers);
- }
-
- @Override
- public <T extends DOMDataTreeShard> ListenerRegistration<T> registerDataTreeShard(
- final DOMDataTreeIdentifier prefix, final T shard, final DOMDataTreeProducer producer)
- throws DOMDataTreeShardingConflictException {
- return delegate.registerDataTreeShard(prefix, shard, producer);
- }
-
- @Override
- public <T extends DOMDataTreeListener> ListenerRegistration<T> registerListener(final T listener,
- final Collection<DOMDataTreeIdentifier> subtrees, final boolean allowRxMerges,
- final Collection<DOMDataTreeProducer> producers) throws DOMDataTreeLoopException {
- return delegate.registerListener(listener, subtrees, allowRxMerges, producers);
- }
-
- @Activate
- void activate() {
- LOG.info("Distributed DOM Data Tree Service starting");
- delegate = new DistributedShardedDOMDataTree(actorSystemProvider, operDatastore, configDatastore);
- delegate.init();
- LOG.info("Distributed DOM Data Tree Service started");
- }
-
- @Deactivate
- void deactivate() {
- LOG.info("Distributed DOM Data Tree Service stopping");
- // TODO: this needs a shutdown hook, I think
- delegate = null;
- LOG.info("Distributed DOM Data Tree Service stopped");
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static akka.actor.ActorRef.noSender;
-import static java.util.Objects.requireNonNull;
-import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_PREFIX_QNAME;
-import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_REPLICAS_QNAME;
-import static org.opendaylight.controller.cluster.datastore.utils.ClusterUtils.SHARD_REPLICA_QNAME;
-
-import akka.actor.ActorRef;
-import java.util.Collection;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
-import org.opendaylight.controller.cluster.datastore.shardstrategy.PrefixShardStrategy;
-import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.ClusteredDOMDataTreeChangeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Listens on changes on prefix-shard-configuration. Resolves the changes and
- * notifies handling actor with {@link PrefixShardCreated} and
- * {@link PrefixShardRemoved} messages.
- */
-@Deprecated(forRemoval = true)
-public class PrefixedShardConfigUpdateHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(PrefixedShardConfigUpdateHandler.class);
- private final ActorRef handlingActor;
- private final MemberName memberName;
-
- private final EnumMap<LogicalDatastoreType,ListenerRegistration<DOMDataTreeChangeListener>> registrations =
- new EnumMap<>(LogicalDatastoreType.class);
-
- public PrefixedShardConfigUpdateHandler(final ActorRef handlingActor, final MemberName memberName) {
- this.handlingActor = requireNonNull(handlingActor);
- this.memberName = requireNonNull(memberName);
- }
-
- public void initListener(final DistributedDataStoreInterface dataStore, final LogicalDatastoreType type) {
- registrations.put(type, dataStore.registerShardConfigListener(
- ClusterUtils.SHARD_LIST_PATH, new ShardConfigHandler(memberName, type, handlingActor)));
- }
-
- public void close() {
- registrations.values().forEach(ListenerRegistration::close);
- registrations.clear();
- }
-
- public static final class ShardConfigHandler implements ClusteredDOMDataTreeChangeListener {
-
- private final MemberName memberName;
- private final LogicalDatastoreType type;
- private final ActorRef handlingActor;
- private final String logName;
-
- public ShardConfigHandler(final MemberName memberName,
- final LogicalDatastoreType type,
- final ActorRef handlingActor) {
- this.memberName = memberName;
- this.type = type;
- this.handlingActor = handlingActor;
- logName = memberName.getName() + "-" + type;
- }
-
- @Override
- public void onDataTreeChanged(final Collection<DataTreeCandidate> changes) {
- changes.forEach(this::resolveChange);
- }
-
- private void resolveChange(final DataTreeCandidate candidate) {
- switch (candidate.getRootNode().getModificationType()) {
- case UNMODIFIED:
- break;
- case APPEARED:
- case DELETE:
- case DISAPPEARED:
- case SUBTREE_MODIFIED:
- case WRITE:
- resolveModifiedRoot(candidate.getRootNode());
- break;
- default:
- break;
- }
- }
-
- private void resolveModifiedRoot(final DataTreeCandidateNode rootNode) {
-
- LOG.debug("{}: New config received {}", logName, rootNode);
- LOG.debug("{}: Data after: {}", logName, rootNode.getDataAfter());
-
- // were in the shards list, iter children and resolve
- for (final DataTreeCandidateNode childNode : rootNode.getChildNodes()) {
- switch (childNode.getModificationType()) {
- case UNMODIFIED:
- break;
- case SUBTREE_MODIFIED:
- case APPEARED:
- case WRITE:
- resolveWrittenShard(childNode);
- break;
- case DELETE:
- case DISAPPEARED:
- resolveDeletedShard(childNode);
- break;
- default:
- break;
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- private void resolveWrittenShard(final DataTreeCandidateNode childNode) {
- final MapEntryNode entryNode = (MapEntryNode) childNode.getDataAfter().get();
- final LeafNode<YangInstanceIdentifier> prefix =
- (LeafNode<YangInstanceIdentifier>) entryNode.getChild(new NodeIdentifier(SHARD_PREFIX_QNAME)).get();
-
- final YangInstanceIdentifier identifier = prefix.getValue();
-
- LOG.debug("{}: Deserialized {} from datastore", logName, identifier);
-
- final ContainerNode replicas =
- (ContainerNode) entryNode.getChild(new NodeIdentifier(SHARD_REPLICAS_QNAME)).get();
-
- final LeafSetNode<String> replicaList =
- (LeafSetNode<String>) replicas.getChild(new NodeIdentifier(SHARD_REPLICA_QNAME)).get();
-
- final List<MemberName> retReplicas = replicaList.getValue().stream()
- .map(child -> MemberName.forName(child.getValue()))
- .collect(Collectors.toList());
-
- LOG.debug("{}: Replicas read from ds {}", logName, retReplicas.toString());
-
- final PrefixShardConfiguration newConfig =
- new PrefixShardConfiguration(new DOMDataTreeIdentifier(type, identifier),
- PrefixShardStrategy.NAME, retReplicas);
-
- LOG.debug("{}: Resulting config {} - sending PrefixShardCreated to {}", logName, newConfig, handlingActor);
-
- handlingActor.tell(new PrefixShardCreated(newConfig), noSender());
- }
-
- private void resolveDeletedShard(final DataTreeCandidateNode childNode) {
-
- final MapEntryNode entryNode = (MapEntryNode) childNode.getDataBefore().get();
-
- final LeafNode<YangInstanceIdentifier> prefix =
- (LeafNode<YangInstanceIdentifier>) entryNode.getChild(new NodeIdentifier(SHARD_PREFIX_QNAME)).get();
-
- final YangInstanceIdentifier deleted = prefix.getValue();
- LOG.debug("{}: Removing shard at {}.", memberName, deleted);
-
- final DOMDataTreeIdentifier domDataTreeIdentifier = new DOMDataTreeIdentifier(type, deleted);
- final PrefixShardRemoved message = new PrefixShardRemoved(domDataTreeIdentifier);
-
- handlingActor.tell(message, noSender());
- }
-
- @Override
- public String toString() {
- return "ShardConfigHandler [logName=" + logName + ", handlingActor=" + handlingActor + "]";
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.sharding;
-
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import java.util.Collection;
-import java.util.concurrent.ExecutionException;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientSnapshot;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeWithValue;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.ListNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetEntryNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafSetNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapEntryNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Writes and removes prefix-based shards' configuration
- * to prefix-shard-configuration. This classed is meant to be utilized
- * by {@link DistributedShardedDOMDataTree} for updating
- * prefix-shard-configuration upon creating and de-spawning prefix-based shards.
- */
-@Deprecated(forRemoval = true)
-class PrefixedShardConfigWriter {
-
- private static final Logger LOG = LoggerFactory.getLogger(PrefixedShardConfigWriter.class);
-
- private final ClientLocalHistory history;
-
- PrefixedShardConfigWriter(final DataStoreClient client) {
- history = client.createLocalHistory();
- writeInitialParent();
- }
-
- ListenableFuture<Void> writeConfig(final YangInstanceIdentifier path, final Collection<MemberName> replicas) {
- LOG.debug("Writing config for {}, replicas {}", path, replicas);
-
- return doSubmit(doWrite(path, replicas));
- }
-
- ListenableFuture<Void> removeConfig(final YangInstanceIdentifier path) {
- LOG.debug("Removing config for {}.", path);
-
- return doSubmit(doDelete(path));
- }
-
- private void writeInitialParent() {
- final ClientTransaction tx = history.createTransaction();
-
- final DOMDataTreeWriteCursor cursor = tx.openCursor();
-
- final ContainerNode root = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new NodeIdentifier(ClusterUtils.PREFIX_SHARDS_QNAME))
- .withChild(ImmutableMapNodeBuilder.create()
- .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_LIST_QNAME))
- .build())
- .build();
-
- cursor.merge(ClusterUtils.PREFIX_SHARDS_PATH.getLastPathArgument(), root);
- cursor.close();
-
- final DOMStoreThreePhaseCommitCohort cohort = tx.ready();
-
- submitBlocking(cohort);
- }
-
- private static void submitBlocking(final DOMStoreThreePhaseCommitCohort cohort) {
- try {
- doSubmit(cohort).get();
- } catch (final InterruptedException | ExecutionException e) {
- LOG.error("Unable to write initial shard config parent.", e);
- }
- }
-
- private static ListenableFuture<Void> doSubmit(final DOMStoreThreePhaseCommitCohort cohort) {
- final AsyncFunction<Boolean, Void> validateFunction = input -> cohort.preCommit();
- final AsyncFunction<Void, Void> prepareFunction = input -> cohort.commit();
-
- final ListenableFuture<Void> prepareFuture = Futures.transformAsync(cohort.canCommit(), validateFunction,
- MoreExecutors.directExecutor());
- return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor());
- }
-
- boolean checkDefaultIsPresent() {
- final NodeIdentifierWithPredicates pag =
- NodeIdentifierWithPredicates.of(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME,
- YangInstanceIdentifier.empty());
-
- final YangInstanceIdentifier defaultId = ClusterUtils.SHARD_LIST_PATH.node(pag);
-
- final ClientSnapshot snapshot = history.takeSnapshot();
- try {
- return snapshot.exists(defaultId).get();
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Presence check of default shard in configuration failed.", e);
- return false;
- } finally {
- snapshot.abort();
- }
- }
-
- private DOMStoreThreePhaseCommitCohort doWrite(final YangInstanceIdentifier path,
- final Collection<MemberName> replicas) {
-
- final ListNodeBuilder<Object, LeafSetEntryNode<Object>> replicaListBuilder =
- ImmutableLeafSetNodeBuilder.create().withNodeIdentifier(
- new NodeIdentifier(ClusterUtils.SHARD_REPLICA_QNAME));
-
- replicas.forEach(name -> replicaListBuilder.withChild(
- ImmutableLeafSetEntryNodeBuilder.create()
- .withNodeIdentifier(new NodeWithValue<>(ClusterUtils.SHARD_REPLICA_QNAME, name.getName()))
- .withValue(name.getName())
- .build()));
-
- final MapEntryNode newEntry = ImmutableMapEntryNodeBuilder.create()
- .withNodeIdentifier(
- NodeIdentifierWithPredicates.of(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME,
- path))
- .withChild(ImmutableLeafNodeBuilder.create()
- .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_PREFIX_QNAME))
- .withValue(path)
- .build())
- .withChild(ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new NodeIdentifier(ClusterUtils.SHARD_REPLICAS_QNAME))
- .withChild(replicaListBuilder.build())
- .build())
- .build();
-
- final ClientTransaction tx = history.createTransaction();
- final DOMDataTreeWriteCursor cursor = tx.openCursor();
-
- ClusterUtils.SHARD_LIST_PATH.getPathArguments().forEach(cursor::enter);
-
- cursor.write(newEntry.getIdentifier(), newEntry);
- cursor.close();
-
- return tx.ready();
- }
-
- private DOMStoreThreePhaseCommitCohort doDelete(final YangInstanceIdentifier path) {
-
- final ClientTransaction tx = history.createTransaction();
- final DOMDataTreeWriteCursor cursor = tx.openCursor();
-
- ClusterUtils.SHARD_LIST_PATH.getPathArguments().forEach(cursor::enter);
-
- cursor.delete(
- NodeIdentifierWithPredicates.of(ClusterUtils.SHARD_LIST_QNAME, ClusterUtils.SHARD_PREFIX_QNAME, path));
- cursor.close();
-
- return tx.ready();
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
-import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
-import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
-import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
-import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
-import org.opendaylight.controller.cluster.notifications.RoleChangeNotification;
-
-/**
- * Proxy actor which acts as a facade for user-provided
- * {@link LeaderLocationListener}. It subscribes for {@link LeaderStateChanged}
- * notifications in its pre start hook and translates them to
- * {@link LeaderLocationListener#onLeaderLocationChanged(LeaderLocation)}
- * events.
- */
-@Deprecated(forRemoval = true)
-public final class RoleChangeListenerActor extends AbstractUntypedActor {
- private final LeaderLocationListener leaderLocationListener;
- private final ActorRef roleChangeNotifier;
-
- private RoleChangeListenerActor(final ActorRef roleChangeNotifier, final LeaderLocationListener listener) {
- this.roleChangeNotifier = requireNonNull(roleChangeNotifier);
- this.leaderLocationListener = requireNonNull(listener);
- }
-
- @Override
- public void preStart() throws Exception {
- super.preStart();
- roleChangeNotifier.tell(new RegisterRoleChangeListener(), getSelf());
- }
-
- @Override
- protected void handleReceive(final Object message) {
- if (message instanceof RoleChangeNotification) {
- ignoreMessage(message);
- } else if (message instanceof LeaderStateChanged) {
- onLeaderStateChanged((LeaderStateChanged) message);
- } else {
- unknownMessage(message);
- }
- }
-
- private void onLeaderStateChanged(final LeaderStateChanged message) {
- final LeaderLocation newLocation;
- if (message.getLeaderId() == null) {
- newLocation = LeaderLocation.UNKNOWN;
- } else if (message.getMemberId().equals(message.getLeaderId())) {
- newLocation = LeaderLocation.LOCAL;
- } else {
- newLocation = LeaderLocation.REMOTE;
- }
-
- // TODO should we wrap this in try catch block?
- leaderLocationListener.onLeaderLocationChanged(newLocation);
- }
-
- public static Props props(final ActorRef roleChangeNotifier, final LeaderLocationListener listener) {
- return Props.create(RoleChangeListenerActor.class, roleChangeNotifier, listener);
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.collect.ImmutableList;
-import java.util.Collection;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardProducer;
-import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
-
-/**
- * Proxy producer implementation that creates transactions that forward all calls to {@link DataStoreClient}.
- */
-@Deprecated(forRemoval = true)
-class ShardProxyProducer implements DOMDataTreeShardProducer {
- private final DOMDataTreeIdentifier shardRoot;
- private final Collection<DOMDataTreeIdentifier> prefixes;
- private final ClientLocalHistory history;
- private DistributedShardModificationFactory modificationFactory;
-
- ShardProxyProducer(final DOMDataTreeIdentifier shardRoot,
- final Collection<DOMDataTreeIdentifier> prefixes,
- final DataStoreClient client,
- final DistributedShardModificationFactory modificationFactory) {
- this.shardRoot = requireNonNull(shardRoot);
- this.prefixes = ImmutableList.copyOf(prefixes);
- this.modificationFactory = requireNonNull(modificationFactory);
- history = requireNonNull(client).createLocalHistory();
- }
-
- @Override
- public Collection<DOMDataTreeIdentifier> getPrefixes() {
- return prefixes;
- }
-
- @Override
- public DOMDataTreeShardWriteTransaction createTransaction() {
- return new ShardProxyTransaction(shardRoot, prefixes,
- modificationFactory.createModification(history.createTransaction()));
- }
-
- DistributedShardModificationFactory getModificationFactory() {
- return modificationFactory;
- }
-
- void setModificationFactory(final DistributedShardModificationFactory modificationFactory) {
- this.modificationFactory = requireNonNull(modificationFactory);
- }
-
- @Override
- public void close() {
- // FIXME: implement this
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
-import org.opendaylight.mdsal.dom.spi.shard.DOMDataTreeShardWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.shard.ForeignShardModificationContext;
-import org.opendaylight.mdsal.dom.spi.shard.ForeignShardThreePhaseCommitCohort;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Proxy {@link DOMDataTreeShardWriteTransaction} that creates a proxy cursor that translates all calls into
- * {@link ClientTransaction} calls.
- */
-@Deprecated(forRemoval = true)
-class ShardProxyTransaction implements DOMDataTreeShardWriteTransaction {
-
- private static final Logger LOG = LoggerFactory.getLogger(ShardProxyTransaction.class);
-
- private final DOMDataTreeIdentifier shardRoot;
- private final Collection<DOMDataTreeIdentifier> prefixes;
- private final DistributedShardModification modification;
- private ClientTransaction currentTx;
- private final List<DOMStoreThreePhaseCommitCohort> cohorts = new ArrayList<>();
-
- private DOMDataTreeWriteCursor cursor = null;
-
- ShardProxyTransaction(final DOMDataTreeIdentifier shardRoot,
- final Collection<DOMDataTreeIdentifier> prefixes,
- final DistributedShardModification modification) {
- this.shardRoot = requireNonNull(shardRoot);
- this.prefixes = requireNonNull(prefixes);
- this.modification = requireNonNull(modification);
- }
-
- private DOMDataTreeWriteCursor getCursor() {
- if (cursor == null) {
- cursor = new DistributedShardModificationCursor(modification, this);
- }
- return cursor;
- }
-
- @Override
- public DOMDataTreeWriteCursor createCursor(final DOMDataTreeIdentifier prefix) {
- checkAvailable(prefix);
- final YangInstanceIdentifier relativePath = toRelative(prefix.getRootIdentifier());
- final DOMDataTreeWriteCursor ret = getCursor();
- ret.enter(relativePath.getPathArguments());
- return ret;
- }
-
- void cursorClosed() {
- cursor = null;
- modification.cursorClosed();
- }
-
- private void checkAvailable(final DOMDataTreeIdentifier prefix) {
- for (final DOMDataTreeIdentifier p : prefixes) {
- if (p.contains(prefix)) {
- return;
- }
- }
- throw new IllegalArgumentException("Prefix[" + prefix + "] not available for this transaction. "
- + "Available prefixes: " + prefixes);
- }
-
- private YangInstanceIdentifier toRelative(final YangInstanceIdentifier path) {
- final Optional<YangInstanceIdentifier> relative =
- path.relativeTo(modification.getPrefix().getRootIdentifier());
- checkArgument(relative.isPresent());
- return relative.get();
- }
-
- @Override
- public void ready() {
- LOG.debug("Readying transaction for shard {}", shardRoot);
-
- requireNonNull(modification, "Attempting to ready an empty transaction.");
-
- cohorts.add(modification.seal());
- for (Entry<DOMDataTreeIdentifier, ForeignShardModificationContext> entry
- : modification.getChildShards().entrySet()) {
- cohorts.add(new ForeignShardThreePhaseCommitCohort(entry.getKey(), entry.getValue()));
- }
- }
-
- @Override
- public void close() {
- cohorts.forEach(DOMStoreThreePhaseCommitCohort::abort);
- cohorts.clear();
-
- if (currentTx != null) {
- currentTx.abort();
- currentTx = null;
- }
- }
-
- @Override
- public ListenableFuture<Void> submit() {
- LOG.debug("Submitting transaction for shard {}", shardRoot);
-
- checkTransactionReadied();
-
- final AsyncFunction<Boolean, Void> validateFunction = input -> prepare();
- final AsyncFunction<Void, Void> prepareFunction = input -> commit();
-
- // transform validate into prepare
- final ListenableFuture<Void> prepareFuture = Futures.transformAsync(validate(), validateFunction,
- MoreExecutors.directExecutor());
- // transform prepare into commit and return as submit result
- return Futures.transformAsync(prepareFuture, prepareFunction, MoreExecutors.directExecutor());
- }
-
- private void checkTransactionReadied() {
- checkState(!cohorts.isEmpty(), "Transaction not readied yet");
- }
-
- @Override
- public ListenableFuture<Boolean> validate() {
- LOG.debug("Validating transaction for shard {}", shardRoot);
-
- checkTransactionReadied();
- final List<ListenableFuture<Boolean>> futures =
- cohorts.stream().map(DOMStoreThreePhaseCommitCohort::canCommit).collect(Collectors.toList());
- final SettableFuture<Boolean> ret = SettableFuture.create();
-
- Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Boolean>>() {
- @Override
- public void onSuccess(final List<Boolean> result) {
- ret.set(true);
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- ret.setException(throwable);
- }
- }, MoreExecutors.directExecutor());
-
- return ret;
- }
-
- @Override
- public ListenableFuture<Void> prepare() {
- LOG.debug("Preparing transaction for shard {}", shardRoot);
-
- checkTransactionReadied();
- final List<ListenableFuture<Void>> futures =
- cohorts.stream().map(DOMStoreThreePhaseCommitCohort::preCommit).collect(Collectors.toList());
- final SettableFuture<Void> ret = SettableFuture.create();
-
- Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
- @Override
- public void onSuccess(final List<Void> result) {
- ret.set(null);
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- ret.setException(throwable);
- }
- }, MoreExecutors.directExecutor());
-
- return ret;
- }
-
- @Override
- public ListenableFuture<Void> commit() {
- LOG.debug("Committing transaction for shard {}", shardRoot);
-
- checkTransactionReadied();
- final List<ListenableFuture<Void>> futures =
- cohorts.stream().map(DOMStoreThreePhaseCommitCohort::commit).collect(Collectors.toList());
- final SettableFuture<Void> ret = SettableFuture.create();
-
- Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
- @Override
- public void onSuccess(final List<Void> result) {
- ret.set(null);
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- ret.setException(throwable);
- }
- }, MoreExecutors.directExecutor());
-
- return ret;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSelection;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.actor.Status;
-import akka.actor.Status.Success;
-import akka.cluster.ClusterEvent;
-import akka.cluster.ClusterEvent.MemberExited;
-import akka.cluster.ClusterEvent.MemberRemoved;
-import akka.cluster.ClusterEvent.MemberUp;
-import akka.cluster.ClusterEvent.MemberWeaklyUp;
-import akka.cluster.ClusterEvent.ReachableMember;
-import akka.cluster.ClusterEvent.UnreachableMember;
-import akka.cluster.Member;
-import akka.dispatch.OnComplete;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedPersistentActor;
-import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.cluster.raft.client.messages.FindLeader;
-import org.opendaylight.controller.cluster.raft.client.messages.FindLeaderReply;
-import org.opendaylight.controller.cluster.sharding.messages.LookupPrefixShard;
-import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerCreated;
-import org.opendaylight.controller.cluster.sharding.messages.NotifyProducerRemoved;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardCreated;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemovalLookup;
-import org.opendaylight.controller.cluster.sharding.messages.PrefixShardRemoved;
-import org.opendaylight.controller.cluster.sharding.messages.ProducerCreated;
-import org.opendaylight.controller.cluster.sharding.messages.ProducerRemoved;
-import org.opendaylight.controller.cluster.sharding.messages.StartConfigShardLookup;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShard;
-import org.opendaylight.mdsal.dom.broker.DOMDataTreeShardRegistration;
-import org.opendaylight.mdsal.dom.spi.DOMDataTreePrefixTableEntry;
-import org.opendaylight.yangtools.concepts.AbstractObjectRegistration;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.compat.java8.FutureConverters;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Actor that tracks currently open producers/shards on remote nodes and handles notifications of remote
- * nodes of newly open producers/shards on the local node.
- */
-@Deprecated(forRemoval = true)
-public class ShardedDataTreeActor extends AbstractUntypedPersistentActor {
-
- private static final Logger LOG = LoggerFactory.getLogger(ShardedDataTreeActor.class);
-
- private static final String PERSISTENCE_ID = "sharding-service-actor";
- private static final Timeout DEFAULT_ASK_TIMEOUT = new Timeout(15, TimeUnit.SECONDS);
-
- static final FiniteDuration SHARD_LOOKUP_TASK_INTERVAL = new FiniteDuration(1L, TimeUnit.SECONDS);
-
- private final DistributedShardedDOMDataTree shardingService;
- private final ActorSystem actorSystem;
- private final ClusterWrapper clusterWrapper;
- // helper actorContext used only for static calls to executeAsync etc
- // for calls that need specific actor context tied to a datastore use the one provided in the DistributedDataStore
- private final ActorUtils actorUtils;
- private final ShardingServiceAddressResolver resolver;
- private final DistributedDataStoreInterface distributedConfigDatastore;
- private final DistributedDataStoreInterface distributedOperDatastore;
- private final int lookupTaskMaxRetries;
-
- private final Map<DOMDataTreeIdentifier, ActorProducerRegistration> idToProducer = new HashMap<>();
-
- ShardedDataTreeActor(final ShardedDataTreeActorCreator builder) {
- LOG.debug("Creating ShardedDataTreeActor on {}", builder.getClusterWrapper().getCurrentMemberName());
-
- shardingService = builder.getShardingService();
- actorSystem = builder.getActorSystem();
- clusterWrapper = builder.getClusterWrapper();
- distributedConfigDatastore = builder.getDistributedConfigDatastore();
- distributedOperDatastore = builder.getDistributedOperDatastore();
- lookupTaskMaxRetries = builder.getLookupTaskMaxRetries();
- actorUtils = distributedConfigDatastore.getActorUtils();
- resolver = new ShardingServiceAddressResolver(
- DistributedShardedDOMDataTree.ACTOR_ID, clusterWrapper.getCurrentMemberName());
-
- clusterWrapper.subscribeToMemberEvents(self());
- }
-
- @Override
- public void preStart() {
- }
-
- @Override
- protected void handleRecover(final Object message) {
- LOG.debug("Received a recover message {}", message);
- }
-
- @Override
- protected void handleCommand(final Object message) {
- LOG.debug("{} : Received {}", clusterWrapper.getCurrentMemberName(), message);
- if (message instanceof ClusterEvent.MemberUp) {
- memberUp((ClusterEvent.MemberUp) message);
- } else if (message instanceof ClusterEvent.MemberWeaklyUp) {
- memberWeaklyUp((ClusterEvent.MemberWeaklyUp) message);
- } else if (message instanceof ClusterEvent.MemberExited) {
- memberExited((ClusterEvent.MemberExited) message);
- } else if (message instanceof ClusterEvent.MemberRemoved) {
- memberRemoved((ClusterEvent.MemberRemoved) message);
- } else if (message instanceof ClusterEvent.UnreachableMember) {
- memberUnreachable((ClusterEvent.UnreachableMember) message);
- } else if (message instanceof ClusterEvent.ReachableMember) {
- memberReachable((ClusterEvent.ReachableMember) message);
- } else if (message instanceof ProducerCreated) {
- onProducerCreated((ProducerCreated) message);
- } else if (message instanceof NotifyProducerCreated) {
- onNotifyProducerCreated((NotifyProducerCreated) message);
- } else if (message instanceof ProducerRemoved) {
- onProducerRemoved((ProducerRemoved) message);
- } else if (message instanceof NotifyProducerRemoved) {
- onNotifyProducerRemoved((NotifyProducerRemoved) message);
- } else if (message instanceof PrefixShardCreated) {
- onPrefixShardCreated((PrefixShardCreated) message);
- } else if (message instanceof LookupPrefixShard) {
- onLookupPrefixShard((LookupPrefixShard) message);
- } else if (message instanceof PrefixShardRemovalLookup) {
- onPrefixShardRemovalLookup((PrefixShardRemovalLookup) message);
- } else if (message instanceof PrefixShardRemoved) {
- onPrefixShardRemoved((PrefixShardRemoved) message);
- } else if (message instanceof StartConfigShardLookup) {
- onStartConfigShardLookup((StartConfigShardLookup) message);
- }
- }
-
- @Override
- public String persistenceId() {
- return PERSISTENCE_ID;
- }
-
- private void memberUp(final MemberUp message) {
- final MemberName memberName = memberToName(message.member());
-
- LOG.info("{}: Received MemberUp: memberName: {}, address: {}", persistenceId(), memberName,
- message.member().address());
-
- resolver.addPeerAddress(memberName, message.member().address());
- }
-
- private void memberWeaklyUp(final MemberWeaklyUp message) {
- final MemberName memberName = memberToName(message.member());
-
- LOG.info("{}: Received MemberWeaklyUp: memberName: {}, address: {}", persistenceId(), memberName,
- message.member().address());
-
- resolver.addPeerAddress(memberName, message.member().address());
- }
-
- private void memberExited(final MemberExited message) {
- final MemberName memberName = memberToName(message.member());
-
- LOG.info("{}: Received MemberExited: memberName: {}, address: {}", persistenceId(), memberName,
- message.member().address());
-
- resolver.removePeerAddress(memberName);
- }
-
- private void memberRemoved(final MemberRemoved message) {
- final MemberName memberName = memberToName(message.member());
-
- LOG.info("{}: Received MemberRemoved: memberName: {}, address: {}", persistenceId(), memberName,
- message.member().address());
-
- resolver.removePeerAddress(memberName);
- }
-
- private void memberUnreachable(final UnreachableMember message) {
- final MemberName memberName = memberToName(message.member());
- LOG.debug("Received UnreachableMember: memberName {}, address: {}", memberName, message.member().address());
-
- resolver.removePeerAddress(memberName);
- }
-
- private void memberReachable(final ReachableMember message) {
- final MemberName memberName = memberToName(message.member());
- LOG.debug("Received ReachableMember: memberName {}, address: {}", memberName, message.member().address());
-
- resolver.addPeerAddress(memberName, message.member().address());
- }
-
- private void onProducerCreated(final ProducerCreated message) {
- LOG.debug("Received ProducerCreated: {}", message);
-
- // fastpath if we have no peers
- if (resolver.getShardingServicePeerActorAddresses().isEmpty()) {
- getSender().tell(new Status.Success(null), ActorRef.noSender());
- }
-
- final ActorRef sender = getSender();
- final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
-
- final List<CompletableFuture<Object>> futures = new ArrayList<>();
-
- for (final String address : resolver.getShardingServicePeerActorAddresses()) {
- final ActorSelection actorSelection = actorSystem.actorSelection(address);
- futures.add(
- FutureConverters.toJava(
- actorUtils.executeOperationAsync(
- actorSelection, new NotifyProducerCreated(subtrees), DEFAULT_ASK_TIMEOUT))
- .toCompletableFuture());
- }
-
- final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
- futures.toArray(new CompletableFuture[futures.size()]));
-
- combinedFuture
- .thenRun(() -> sender.tell(new Success(null), ActorRef.noSender()))
- .exceptionally(throwable -> {
- sender.tell(new Status.Failure(throwable), self());
- return null;
- });
- }
-
- private void onNotifyProducerCreated(final NotifyProducerCreated message) {
- LOG.debug("Received NotifyProducerCreated: {}", message);
-
- final Collection<DOMDataTreeIdentifier> subtrees = message.getSubtrees();
-
- try {
- final ActorProducerRegistration registration =
- new ActorProducerRegistration(shardingService.localCreateProducer(subtrees), subtrees);
- subtrees.forEach(id -> idToProducer.put(id, registration));
- sender().tell(new Status.Success(null), self());
- } catch (final IllegalArgumentException e) {
- sender().tell(new Status.Failure(e), getSelf());
- }
- }
-
- private void onProducerRemoved(final ProducerRemoved message) {
- LOG.debug("Received ProducerRemoved: {}", message);
-
- final List<CompletableFuture<Object>> futures = new ArrayList<>();
-
- for (final String address : resolver.getShardingServicePeerActorAddresses()) {
- final ActorSelection selection = actorSystem.actorSelection(address);
-
- futures.add(FutureConverters.toJava(
- actorUtils.executeOperationAsync(selection, new NotifyProducerRemoved(message.getSubtrees())))
- .toCompletableFuture());
- }
-
- final CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
- futures.toArray(new CompletableFuture[futures.size()]));
-
- final ActorRef respondTo = getSender();
-
- combinedFuture
- .thenRun(() -> respondTo.tell(new Status.Success(null), self()))
- .exceptionally(e -> {
- respondTo.tell(new Status.Failure(null), self());
- return null;
- });
-
- }
-
- private void onNotifyProducerRemoved(final NotifyProducerRemoved message) {
- LOG.debug("Received NotifyProducerRemoved: {}", message);
-
- final ActorProducerRegistration registration = idToProducer.remove(message.getSubtrees().iterator().next());
- if (registration == null) {
- LOG.warn("The notification contained a path on which no producer is registered, throwing away");
- getSender().tell(new Status.Success(null), ActorRef.noSender());
- return;
- }
-
- try {
- registration.close();
- getSender().tell(new Status.Success(null), ActorRef.noSender());
- } catch (final DOMDataTreeProducerException e) {
- LOG.error("Unable to close producer", e);
- getSender().tell(new Status.Failure(e), ActorRef.noSender());
- }
- }
-
- @SuppressWarnings("checkstyle:IllegalCatch")
- private void onLookupPrefixShard(final LookupPrefixShard message) {
- LOG.debug("Member: {}, Received LookupPrefixShard: {}", clusterWrapper.getCurrentMemberName(), message);
-
- final DOMDataTreeIdentifier prefix = message.getPrefix();
-
- final ActorUtils utils = prefix.getDatastoreType() == LogicalDatastoreType.CONFIGURATION
- ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
-
- // schedule a notification task for the reply
- actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
- new ShardCreationLookupTask(actorSystem, getSender(), clusterWrapper,
- utils, shardingService, prefix, lookupTaskMaxRetries), actorSystem.dispatcher());
- }
-
- private void onPrefixShardCreated(final PrefixShardCreated message) {
- LOG.debug("Member: {}, Received PrefixShardCreated: {}", clusterWrapper.getCurrentMemberName(), message);
-
- final PrefixShardConfiguration config = message.getConfiguration();
-
- shardingService.resolveShardAdditions(Collections.singleton(config.getPrefix()));
- }
-
- private void onPrefixShardRemovalLookup(final PrefixShardRemovalLookup message) {
- LOG.debug("Member: {}, Received PrefixShardRemovalLookup: {}", clusterWrapper.getCurrentMemberName(), message);
-
- final ShardRemovalLookupTask removalTask =
- new ShardRemovalLookupTask(actorSystem, getSender(),
- actorUtils, message.getPrefix(), lookupTaskMaxRetries);
-
- actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL, removalTask, actorSystem.dispatcher());
- }
-
- private void onPrefixShardRemoved(final PrefixShardRemoved message) {
- LOG.debug("Received PrefixShardRemoved: {}", message);
-
- shardingService.resolveShardRemovals(Collections.singleton(message.getPrefix()));
- }
-
- private void onStartConfigShardLookup(final StartConfigShardLookup message) {
- LOG.debug("Received StartConfigShardLookup: {}", message);
-
- final ActorUtils context =
- message.getType().equals(LogicalDatastoreType.CONFIGURATION)
- ? distributedConfigDatastore.getActorUtils() : distributedOperDatastore.getActorUtils();
-
- // schedule a notification task for the reply
- actorSystem.scheduler().scheduleOnce(SHARD_LOOKUP_TASK_INTERVAL,
- new ConfigShardLookupTask(
- actorSystem, getSender(), context, message, lookupTaskMaxRetries),
- actorSystem.dispatcher());
- }
-
- private static MemberName memberToName(final Member member) {
- return MemberName.forName(member.roles().iterator().next());
- }
-
- private class ActorProducerRegistration {
-
- private final DOMDataTreeProducer producer;
- private final Collection<DOMDataTreeIdentifier> subtrees;
-
- ActorProducerRegistration(final DOMDataTreeProducer producer,
- final Collection<DOMDataTreeIdentifier> subtrees) {
- this.producer = producer;
- this.subtrees = subtrees;
- }
-
- void close() throws DOMDataTreeProducerException {
- producer.close();
- subtrees.forEach(idToProducer::remove);
- }
- }
-
- private static class ShardFrontendRegistration extends
- AbstractObjectRegistration<ListenerRegistration<DistributedShardFrontend>> {
-
- private final ActorRef clientActor;
- private final ListenerRegistration<DistributedShardFrontend> shardRegistration;
-
- ShardFrontendRegistration(final ActorRef clientActor,
- final ListenerRegistration<DistributedShardFrontend> shardRegistration) {
- super(shardRegistration);
- this.clientActor = clientActor;
- this.shardRegistration = shardRegistration;
- }
-
- @Override
- protected void removeRegistration() {
- shardRegistration.close();
- clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
- }
-
- /**
- * Handles the lookup step of cds shard creation once the configuration is updated.
- */
- private static class ShardCreationLookupTask extends LookupTask {
-
- private final ActorSystem system;
- private final ActorRef replyTo;
- private final ClusterWrapper clusterWrapper;
- private final ActorUtils context;
- private final DistributedShardedDOMDataTree shardingService;
- private final DOMDataTreeIdentifier toLookup;
- private final int lookupMaxRetries;
-
- ShardCreationLookupTask(final ActorSystem system,
- final ActorRef replyTo,
- final ClusterWrapper clusterWrapper,
- final ActorUtils context,
- final DistributedShardedDOMDataTree shardingService,
- final DOMDataTreeIdentifier toLookup,
- final int lookupMaxRetries) {
- super(replyTo, lookupMaxRetries);
- this.system = system;
- this.replyTo = replyTo;
- this.clusterWrapper = clusterWrapper;
- this.context = context;
- this.shardingService = shardingService;
- this.toLookup = toLookup;
- this.lookupMaxRetries = lookupMaxRetries;
- }
-
- @Override
- public void run() {
- final Future<ActorRef> localShardFuture =
- context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
-
- localShardFuture.onComplete(new OnComplete<ActorRef>() {
- @Override
- public void onComplete(final Throwable throwable, final ActorRef actorRef) {
- if (throwable != null) {
- tryReschedule(throwable);
- } else {
- LOG.debug("Local backend for shard[{}] lookup successful, starting leader lookup..", toLookup);
-
- system.scheduler().scheduleOnce(
- SHARD_LOOKUP_TASK_INTERVAL,
- new ShardLeaderLookupTask(system, replyTo, context, clusterWrapper, actorRef,
- shardingService, toLookup, lookupMaxRetries),
- system.dispatcher());
- }
- }
- }, system.dispatcher());
- }
-
- @Override
- void reschedule(final int retries) {
- LOG.debug("Local backend for shard[{}] not found, try: {}, rescheduling..", toLookup, retries);
- system.scheduler().scheduleOnce(
- SHARD_LOOKUP_TASK_INTERVAL, ShardCreationLookupTask.this, system.dispatcher());
- }
- }
-
- /**
- * Handles the readiness step by waiting for a leader of the created shard.
- */
- private static class ShardLeaderLookupTask extends LookupTask {
-
- private final ActorSystem system;
- private final ActorRef replyTo;
- private final ActorUtils context;
- private final ClusterWrapper clusterWrapper;
- private final ActorRef shard;
- private final DistributedShardedDOMDataTree shardingService;
- private final DOMDataTreeIdentifier toLookup;
- private final int lookupMaxRetries;
-
- ShardLeaderLookupTask(final ActorSystem system,
- final ActorRef replyTo,
- final ActorUtils context,
- final ClusterWrapper clusterWrapper,
- final ActorRef shard,
- final DistributedShardedDOMDataTree shardingService,
- final DOMDataTreeIdentifier toLookup,
- final int lookupMaxRetries) {
- super(replyTo, lookupMaxRetries);
- this.system = system;
- this.replyTo = replyTo;
- this.context = context;
- this.clusterWrapper = clusterWrapper;
- this.shard = shard;
- this.shardingService = shardingService;
- this.toLookup = toLookup;
- this.lookupMaxRetries = lookupMaxRetries;
- }
-
- @Override
- public void run() {
-
- final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
-
- ask.onComplete(new OnComplete<>() {
- @Override
- public void onComplete(final Throwable throwable, final Object findLeaderReply) {
- if (throwable != null) {
- tryReschedule(throwable);
- } else {
- final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
- final Optional<String> leaderActor = findLeader.getLeaderActor();
- if (leaderActor.isPresent()) {
- // leader is found, backend seems ready, check if the frontend is ready
- LOG.debug("{} - Leader for shard[{}] backend ready, starting frontend lookup..",
- clusterWrapper.getCurrentMemberName(), toLookup);
- system.scheduler().scheduleOnce(
- SHARD_LOOKUP_TASK_INTERVAL,
- new FrontendLookupTask(
- system, replyTo, shardingService, toLookup, lookupMaxRetries),
- system.dispatcher());
- } else {
- tryReschedule(null);
- }
- }
- }
- }, system.dispatcher());
-
- }
-
- @Override
- void reschedule(final int retries) {
- LOG.debug("{} - Leader for shard[{}] backend not found on try: {}, retrying..",
- clusterWrapper.getCurrentMemberName(), toLookup, retries);
- system.scheduler().scheduleOnce(
- SHARD_LOOKUP_TASK_INTERVAL, ShardLeaderLookupTask.this, system.dispatcher());
- }
- }
-
- /**
- * After backend is ready this handles the last step - checking if we have a frontend shard for the backend,
- * once this completes(which should be ready by the time the backend is created, this is just a sanity check in
- * case they race), the future for the cds shard creation is completed and the shard is ready for use.
- */
- private static final class FrontendLookupTask extends LookupTask {
-
- private final ActorSystem system;
- private final ActorRef replyTo;
- private final DistributedShardedDOMDataTree shardingService;
- private final DOMDataTreeIdentifier toLookup;
-
- FrontendLookupTask(final ActorSystem system,
- final ActorRef replyTo,
- final DistributedShardedDOMDataTree shardingService,
- final DOMDataTreeIdentifier toLookup,
- final int lookupMaxRetries) {
- super(replyTo, lookupMaxRetries);
- this.system = system;
- this.replyTo = replyTo;
- this.shardingService = shardingService;
- this.toLookup = toLookup;
- }
-
- @Override
- public void run() {
- final DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> entry =
- shardingService.lookupShardFrontend(toLookup);
-
- if (entry != null && tableEntryIdCheck(entry, toLookup) && entry.getValue() != null) {
- replyTo.tell(new Success(null), ActorRef.noSender());
- } else {
- tryReschedule(null);
- }
- }
-
- private boolean tableEntryIdCheck(final DOMDataTreePrefixTableEntry<?> entry,
- final DOMDataTreeIdentifier prefix) {
- if (entry == null) {
- return false;
- }
-
- if (YangInstanceIdentifier.empty().equals(prefix.getRootIdentifier())) {
- return true;
- }
-
- if (entry.getIdentifier().equals(toLookup.getRootIdentifier().getLastPathArgument())) {
- return true;
- }
-
- return false;
- }
-
- @Override
- void reschedule(final int retries) {
- LOG.debug("Frontend for shard[{}] not found on try: {}, retrying..", toLookup, retries);
- system.scheduler().scheduleOnce(
- SHARD_LOOKUP_TASK_INTERVAL, FrontendLookupTask.this, system.dispatcher());
- }
- }
-
- /**
- * Task that is run once a cds shard registration is closed and completes once the backend shard is removed from the
- * configuration.
- */
- private static class ShardRemovalLookupTask extends LookupTask {
-
- private final ActorSystem system;
- private final ActorRef replyTo;
- private final ActorUtils context;
- private final DOMDataTreeIdentifier toLookup;
-
- ShardRemovalLookupTask(final ActorSystem system,
- final ActorRef replyTo,
- final ActorUtils context,
- final DOMDataTreeIdentifier toLookup,
- final int lookupMaxRetries) {
- super(replyTo, lookupMaxRetries);
- this.system = system;
- this.replyTo = replyTo;
- this.context = context;
- this.toLookup = toLookup;
- }
-
- @Override
- public void run() {
- final Future<ActorRef> localShardFuture =
- context.findLocalShardAsync(ClusterUtils.getCleanShardName(toLookup.getRootIdentifier()));
-
- localShardFuture.onComplete(new OnComplete<ActorRef>() {
- @Override
- public void onComplete(final Throwable throwable, final ActorRef actorRef) {
- if (throwable != null) {
- //TODO Shouldn't we check why findLocalShard failed?
- LOG.debug("Backend shard[{}] removal lookup successful notifying the registration future",
- toLookup);
- replyTo.tell(new Success(null), ActorRef.noSender());
- } else {
- tryReschedule(null);
- }
- }
- }, system.dispatcher());
- }
-
- @Override
- void reschedule(final int retries) {
- LOG.debug("Backend shard[{}] removal lookup failed, shard is still present, try: {}, rescheduling..",
- toLookup, retries);
- system.scheduler().scheduleOnce(
- SHARD_LOOKUP_TASK_INTERVAL, ShardRemovalLookupTask.this, system.dispatcher());
- }
- }
-
- /**
- * Task for handling the lookup of the backend for the configuration shard.
- */
- private static class ConfigShardLookupTask extends LookupTask {
-
- private final ActorSystem system;
- private final ActorRef replyTo;
- private final ActorUtils context;
-
- ConfigShardLookupTask(final ActorSystem system,
- final ActorRef replyTo,
- final ActorUtils context,
- final StartConfigShardLookup message,
- final int lookupMaxRetries) {
- super(replyTo, lookupMaxRetries);
- this.system = system;
- this.replyTo = replyTo;
- this.context = context;
- }
-
- @Override
- void reschedule(final int retries) {
- LOG.debug("Local backend for prefix configuration shard not found, try: {}, rescheduling..", retries);
- system.scheduler().scheduleOnce(
- SHARD_LOOKUP_TASK_INTERVAL, ConfigShardLookupTask.this, system.dispatcher());
- }
-
- @Override
- public void run() {
- final Optional<ActorRef> localShard =
- context.findLocalShard(ClusterUtils.PREFIX_CONFIG_SHARD_ID);
-
- if (!localShard.isPresent()) {
- tryReschedule(null);
- } else {
- LOG.debug("Local backend for prefix configuration shard lookup successful");
- replyTo.tell(new Status.Success(null), ActorRef.noSender());
- }
- }
- }
-
- /**
- * Task for handling the readiness state of the config shard. Reports success once the leader is elected.
- */
- private static class ConfigShardReadinessTask extends LookupTask {
-
- private final ActorSystem system;
- private final ActorRef replyTo;
- private final ActorUtils context;
- private final ClusterWrapper clusterWrapper;
- private final ActorRef shard;
-
- ConfigShardReadinessTask(final ActorSystem system,
- final ActorRef replyTo,
- final ActorUtils context,
- final ClusterWrapper clusterWrapper,
- final ActorRef shard,
- final int lookupMaxRetries) {
- super(replyTo, lookupMaxRetries);
- this.system = system;
- this.replyTo = replyTo;
- this.context = context;
- this.clusterWrapper = clusterWrapper;
- this.shard = shard;
- }
-
- @Override
- void reschedule(final int retries) {
- LOG.debug("{} - Leader for config shard not found on try: {}, retrying..",
- clusterWrapper.getCurrentMemberName(), retries);
- system.scheduler().scheduleOnce(
- SHARD_LOOKUP_TASK_INTERVAL, ConfigShardReadinessTask.this, system.dispatcher());
- }
-
- @Override
- public void run() {
- final Future<Object> ask = Patterns.ask(shard, FindLeader.INSTANCE, context.getOperationTimeout());
-
- ask.onComplete(new OnComplete<>() {
- @Override
- public void onComplete(final Throwable throwable, final Object findLeaderReply) {
- if (throwable != null) {
- tryReschedule(throwable);
- } else {
- final FindLeaderReply findLeader = (FindLeaderReply) findLeaderReply;
- final Optional<String> leaderActor = findLeader.getLeaderActor();
- if (leaderActor.isPresent()) {
- // leader is found, backend seems ready, check if the frontend is ready
- LOG.debug("{} - Leader for config shard is ready. Ending lookup.",
- clusterWrapper.getCurrentMemberName());
- replyTo.tell(new Status.Success(null), ActorRef.noSender());
- } else {
- tryReschedule(null);
- }
- }
- }
- }, system.dispatcher());
- }
- }
-
- public static class ShardedDataTreeActorCreator {
-
- private DistributedShardedDOMDataTree shardingService;
- private DistributedDataStoreInterface distributedConfigDatastore;
- private DistributedDataStoreInterface distributedOperDatastore;
- private ActorSystem actorSystem;
- private ClusterWrapper cluster;
- private int maxRetries;
-
- public DistributedShardedDOMDataTree getShardingService() {
- return shardingService;
- }
-
- public ShardedDataTreeActorCreator setShardingService(final DistributedShardedDOMDataTree shardingService) {
- this.shardingService = shardingService;
- return this;
- }
-
- public ActorSystem getActorSystem() {
- return actorSystem;
- }
-
- public ShardedDataTreeActorCreator setActorSystem(final ActorSystem actorSystem) {
- this.actorSystem = actorSystem;
- return this;
- }
-
- public ShardedDataTreeActorCreator setClusterWrapper(final ClusterWrapper clusterWrapper) {
- this.cluster = clusterWrapper;
- return this;
- }
-
- public ClusterWrapper getClusterWrapper() {
- return cluster;
- }
-
- public DistributedDataStoreInterface getDistributedConfigDatastore() {
- return distributedConfigDatastore;
- }
-
- public ShardedDataTreeActorCreator setDistributedConfigDatastore(
- final DistributedDataStoreInterface distributedConfigDatastore) {
- this.distributedConfigDatastore = distributedConfigDatastore;
- return this;
- }
-
- public DistributedDataStoreInterface getDistributedOperDatastore() {
- return distributedOperDatastore;
- }
-
- public ShardedDataTreeActorCreator setDistributedOperDatastore(
- final DistributedDataStoreInterface distributedOperDatastore) {
- this.distributedOperDatastore = distributedOperDatastore;
- return this;
- }
-
- public ShardedDataTreeActorCreator setLookupTaskMaxRetries(final int newMaxRetries) {
- this.maxRetries = newMaxRetries;
- return this;
- }
-
- public int getLookupTaskMaxRetries() {
- return maxRetries;
- }
-
- private void verify() {
- requireNonNull(shardingService);
- requireNonNull(actorSystem);
- requireNonNull(cluster);
- requireNonNull(distributedConfigDatastore);
- requireNonNull(distributedOperDatastore);
- }
-
- public Props props() {
- verify();
- return Props.create(ShardedDataTreeActor.class, this);
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.Objects.requireNonNull;
-
-import akka.actor.Address;
-import java.util.Collection;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-
-/**
- * Resolver for remote {@link ShardedDataTreeActor}'s.
- */
-@Deprecated(forRemoval = true)
-public class ShardingServiceAddressResolver {
-
- private final ConcurrentMap<MemberName, Address> memberNameToAddress = new ConcurrentHashMap<>();
- private final String shardingServiceActorIdentifier;
- private final MemberName localMemberName;
-
- public ShardingServiceAddressResolver(final String shardingServiceActorIdentifier,
- final MemberName localMemberName) {
- this.shardingServiceActorIdentifier = shardingServiceActorIdentifier;
- this.localMemberName = localMemberName;
- }
-
- void addPeerAddress(final MemberName memberName, final Address address) {
- memberNameToAddress.put(memberName, address);
- }
-
- void removePeerAddress(final MemberName memberName) {
- memberNameToAddress.remove(memberName);
- }
-
- Address getPeerAddress(final MemberName memberName) {
- return memberNameToAddress.get(memberName);
- }
-
- StringBuilder getActorPathBuilder(final Address address) {
- return new StringBuilder().append(address.toString()).append("/user/").append(shardingServiceActorIdentifier);
- }
-
- Collection<String> getShardingServicePeerActorAddresses() {
- final Collection<String> peerAddresses =
- memberNameToAddress
- .entrySet()
- .stream()
- .filter(entry -> !localMemberName.equals(entry.getKey()))
- .map(entry -> getActorPathBuilder(entry.getValue()).toString())
- .collect(Collectors.toList());
-
- return peerAddresses;
- }
-
- public String resolve(final MemberName memberName) {
- final Address address = memberNameToAddress.get(requireNonNull(memberName));
- checkNotNull(address, "Requested member[%s] is not present in the resolver", memberName);
- return getActorPathBuilder(address).toString();
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.sharding.messages;
-
-/**
- * Message sent to the local ShardManager, once the shard configuration shard is ready and the ShardManager should
- * start its listener.
- */
-@Deprecated(forRemoval = true)
-public final class InitConfigListener {
-
- public static final InitConfigListener INSTANCE = new InitConfigListener();
-
- private InitConfigListener() {
-
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding.messages;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.Beta;
-import java.io.Serializable;
-import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-
-/**
- * Sent to the local {@link ShardedDataTreeActor} when there was a shard created
- * on the local node. The local actor should notify the remote actors with {@link PrefixShardCreated} which should
- * create the required frontend/backend shards.
- */
-@Beta
-@Deprecated(forRemoval = true)
-public class LookupPrefixShard implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final DOMDataTreeIdentifier prefix;
-
- public LookupPrefixShard(final DOMDataTreeIdentifier prefix) {
- this.prefix = requireNonNull(prefix);
- }
-
- public DOMDataTreeIdentifier getPrefix() {
- return prefix;
- }
-
-
- @Override
- public String toString() {
- return "LookupPrefixShard{"
- + "prefix="
- + prefix
- + '}';
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding.messages;
-
-import com.google.common.annotations.Beta;
-import com.google.common.collect.ImmutableList;
-import java.io.Serializable;
-import java.util.Collection;
-import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-
-/**
- * Message sent to remote {@link ShardedDataTreeActor}'s when attempting
- * to create a producer. The remote node should attempt to create a producer in the local sharding service and reply
- * with success/failure based on the attempt result.
- */
-@Beta
-@Deprecated(forRemoval = true)
-public class NotifyProducerCreated implements Serializable {
- private static final long serialVersionUID = 1L;
- private final Collection<DOMDataTreeIdentifier> subtrees;
-
- public NotifyProducerCreated(final Collection<DOMDataTreeIdentifier> subtrees) {
- this.subtrees = ImmutableList.copyOf(subtrees);
- }
-
- public Collection<DOMDataTreeIdentifier> getSubtrees() {
- return subtrees;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding.messages;
-
-import com.google.common.annotations.Beta;
-import com.google.common.collect.ImmutableList;
-import java.io.Serializable;
-import java.util.Collection;
-import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-
-/**
- * Message sent to remote {@link ShardedDataTreeActor}'s when attempting
- * to close a producer. The remote node should attempt to close a producer in the local sharding service and reply
- * with success/failure based on the attempt result. If the producer doesn't exist on this node report Success.
- */
-@Beta
-@Deprecated(forRemoval = true)
-public class NotifyProducerRemoved implements Serializable {
- private static final long serialVersionUID = 1L;
- private final Collection<DOMDataTreeIdentifier> subtrees;
-
- public NotifyProducerRemoved(final Collection<DOMDataTreeIdentifier> subtrees) {
- this.subtrees = ImmutableList.copyOf(subtrees);
- }
-
- public Collection<DOMDataTreeIdentifier> getSubtrees() {
- return subtrees;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.sharding.messages;
-
-import com.google.common.annotations.Beta;
-import org.opendaylight.controller.cluster.datastore.config.PrefixShardConfiguration;
-import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
-
-/**
- * Message sent to the local {@link ShardedDataTreeActor} when a clustered
- * shard was created locally. The backend shards/replicas will be handled by the ShardManager but the
- * {@link ShardedDataTreeActor} needs to handle the registration of the
- * frontends into the {@link org.opendaylight.mdsal.dom.api.DOMDataTreeShardingService}. The configuration only contains
- * the Member nodes that this is still yet to be distributed to. The last node will receive PrefixShardConfiguration
- * with only it's member present.
- */
-@Beta
-@Deprecated(forRemoval = true)
-public class PrefixShardCreated {
- private final PrefixShardConfiguration configuration;
-
- public PrefixShardCreated(final PrefixShardConfiguration configuration) {
- this.configuration = configuration;
- }
-
- public PrefixShardConfiguration getConfiguration() {
- return configuration;
- }
-
- @Override
- public String toString() {
- return "PrefixShardCreated{"
- + "configuration=" + configuration
- + '}';
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding.messages;
-
-import static java.util.Objects.requireNonNull;
-
-import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-
-/**
- * Sent to the local {@link ShardedDataTreeActor} to initiate the lookup of the shard, once the shard is removed from
- * the system entirely the actor responds with a success.
- */
-@Deprecated(forRemoval = true)
-public class PrefixShardRemovalLookup {
- private final DOMDataTreeIdentifier prefix;
-
- public PrefixShardRemovalLookup(final DOMDataTreeIdentifier prefix) {
- this.prefix = requireNonNull(prefix);
- }
-
- public DOMDataTreeIdentifier getPrefix() {
- return prefix;
- }
-
- @Override
- public String toString() {
- return "PrefixShardRemovalLookup{" + "prefix=" + prefix + '}';
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.sharding.messages;
-
-import com.google.common.annotations.Beta;
-import java.io.Serializable;
-import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-
-/**
- * Message sent to remote {@link ShardedDataTreeActor}'s when there is an attempt to remove the shard,
- * the ShardedDataTreeActor should remove the shard from the current configuration so that the change is picked up
- * in the backend ShardManager.
- */
-@Beta
-@Deprecated(forRemoval = true)
-public class PrefixShardRemoved implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final DOMDataTreeIdentifier prefix;
-
- public PrefixShardRemoved(final DOMDataTreeIdentifier prefix) {
- this.prefix = prefix;
- }
-
- public DOMDataTreeIdentifier getPrefix() {
- return prefix;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.sharding.messages;
-
-import com.google.common.annotations.Beta;
-import java.util.Collection;
-import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-
-/**
- * Message sent to local {@link ShardedDataTreeActor}'s when there was an
- * attempt to create a producer on the local node.
- */
-@Beta
-@Deprecated(forRemoval = true)
-public class ProducerCreated {
- private final Collection<DOMDataTreeIdentifier> subtrees;
-
- public ProducerCreated(final Collection<DOMDataTreeIdentifier> subtrees) {
- this.subtrees = subtrees;
- }
-
- public Collection<DOMDataTreeIdentifier> getSubtrees() {
- return subtrees;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.sharding.messages;
-
-import com.google.common.annotations.Beta;
-import java.util.Collection;
-import org.opendaylight.controller.cluster.sharding.ShardedDataTreeActor;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-
-/**
- * Message sent to local {@link ShardedDataTreeActor}'s when there was an
- * attempt to close a producer on the local node.
- */
-@Beta
-@Deprecated(forRemoval = true)
-public class ProducerRemoved {
-
- private final Collection<DOMDataTreeIdentifier> subtrees;
-
- public ProducerRemoved(final Collection<DOMDataTreeIdentifier> subtrees) {
- this.subtrees = subtrees;
- }
-
- public Collection<DOMDataTreeIdentifier> getSubtrees() {
- return subtrees;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.sharding.messages;
-
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-
-/**
- * Message that should be sent to ShardedDataTreeActor when the lookup of the prefix config shard should begin.
- * Replied to with Succes once the shard has a leader.
- */
-@Deprecated(forRemoval = true)
-public class StartConfigShardLookup {
-
- private final LogicalDatastoreType type;
-
- public StartConfigShardLookup(final LogicalDatastoreType type) {
- this.type = type;
- }
-
- public LogicalDatastoreType getType() {
- return type;
- }
-
- @Override
- public String toString() {
- return "StartConfigShardLookup{type=" + type + '}';
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Pantheon Technologies s.r.o. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker.actors.dds;
-
-import static org.mockito.Mockito.verify;
-
-import java.util.Arrays;
-import java.util.stream.Collectors;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
-
-@Deprecated(forRemoval = true)
-public class ClientTransactionCursorTest {
-
- private static final QName NODE_1 = QName.create("ns-1", "node-1");
- private static final QName NODE_2 = QName.create(NODE_1, "node-2");
- private static final QName NODE_3 = QName.create(NODE_1, "node-3");
-
- @Mock
- private ClientTransaction transaction;
- private ClientTransactionCursor cursor;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- cursor = new ClientTransactionCursor(transaction);
- }
-
- @Test
- public void testEnterOneNode() {
- cursor.enter(YangInstanceIdentifier.NodeIdentifier.create(NODE_1));
- cursor.delete(YangInstanceIdentifier.NodeIdentifier.create(NODE_2));
- final YangInstanceIdentifier expected = createId(NODE_1, NODE_2);
- verify(transaction).delete(expected);
- }
-
- @Test
- public void testEnterNodeIterables() {
- final Iterable<YangInstanceIdentifier.PathArgument> collect = toPathArg(NODE_1, NODE_2);
- cursor.enter(collect);
- cursor.delete(YangInstanceIdentifier.NodeIdentifier.create(NODE_3));
- final YangInstanceIdentifier expected = createId(NODE_1, NODE_2, NODE_3);
- verify(transaction).delete(expected);
- }
-
- @Test
- public void testEnterNodeVarargs() {
- cursor.enter(YangInstanceIdentifier.NodeIdentifier.create(NODE_1),
- YangInstanceIdentifier.NodeIdentifier.create(NODE_2));
- cursor.delete(YangInstanceIdentifier.NodeIdentifier.create(NODE_3));
- final YangInstanceIdentifier expected = createId(NODE_1, NODE_2, NODE_3);
- verify(transaction).delete(expected);
- }
-
- @Test
- public void testExitOneLevel() {
- cursor.enter(toPathArg(NODE_1, NODE_2));
- cursor.exit();
- cursor.delete(YangInstanceIdentifier.NodeIdentifier.create(NODE_2));
- final YangInstanceIdentifier expected = createId(NODE_1, NODE_2);
- verify(transaction).delete(expected);
- }
-
- @Test
- public void testExitTwoLevels() {
- cursor.enter(toPathArg(NODE_1, NODE_2, NODE_3));
- cursor.exit(2);
- cursor.delete(YangInstanceIdentifier.NodeIdentifier.create(NODE_2));
- final YangInstanceIdentifier expected = createId(NODE_1, NODE_2);
- verify(transaction).delete(expected);
- }
-
- @Test
- public void testClose() {
- cursor.close();
- verify(transaction).closeCursor(cursor);
- }
-
- @Test
- public void testDelete() {
- cursor.delete(YangInstanceIdentifier.NodeIdentifier.create(NODE_1));
- final YangInstanceIdentifier expected = createId(NODE_1);
- verify(transaction).delete(expected);
- }
-
- @Test
- public void testMerge() {
- final YangInstanceIdentifier.NodeIdentifier path = YangInstanceIdentifier.NodeIdentifier.create(NODE_1);
- final ContainerNode data = createData(path.getNodeType());
- cursor.merge(path, data);
- final YangInstanceIdentifier expected = createId(NODE_1);
- verify(transaction).merge(expected, data);
- }
-
- @Test
- public void testWrite() {
- final YangInstanceIdentifier.NodeIdentifier path = YangInstanceIdentifier.NodeIdentifier.create(NODE_1);
- final ContainerNode data = createData(path.getNodeType());
- cursor.write(path, data);
- final YangInstanceIdentifier expected = createId(NODE_1);
- verify(transaction).write(expected, data);
- }
-
- private static Iterable<YangInstanceIdentifier.PathArgument> toPathArg(final QName... pathArguments) {
- return Arrays.stream(pathArguments)
- .map(YangInstanceIdentifier.NodeIdentifier::create)
- .collect(Collectors.toList());
- }
-
- private static YangInstanceIdentifier createId(final QName... pathArguments) {
- return YangInstanceIdentifier.create(toPathArg(pathArguments));
- }
-
- private static ContainerNode createData(final QName id) {
- return Builders.containerBuilder()
- .withNodeIdentifier(YangInstanceIdentifier.NodeIdentifier.create(id))
- .build();
- }
-
-}
\ No newline at end of file
import static org.mockito.Mockito.when;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.TRANSACTION_ID;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertFutureEquals;
-import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.assertOperationThrowsException;
import static org.opendaylight.controller.cluster.databroker.actors.dds.TestUtils.getWithTimeout;
import com.google.common.util.concurrent.FluentFuture;
import org.mockito.Mock;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
transaction.read(PATH);
}
- @Test
- public void testOpenCloseCursor() {
- final DOMDataTreeWriteCursor cursor = getHandle().openCursor();
- getHandle().closeCursor(cursor);
- getHandle().openCursor().delete(PATH.getLastPathArgument());
- verify(modification).delete(PATH);
- }
-
- @Test
- public void testOpenSecondCursor() throws Exception {
- getHandle().openCursor();
- assertOperationThrowsException(getHandle()::openCursor, IllegalStateException.class);
- }
-
@Test
public void testExists() throws Exception {
final FluentFuture<Boolean> exists = getHandle().exists(PATH);
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
-import akka.actor.ActorRef;
-import akka.dispatch.Futures;
-import java.util.Optional;
-import java.util.concurrent.TimeUnit;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext;
-import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
-import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
-import org.opendaylight.controller.cluster.dom.api.LeaderLocationListenerRegistration;
-import org.opendaylight.controller.cluster.raft.LeadershipTransferFailedException;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-@Deprecated(forRemoval = true)
-public class CDSShardAccessImplTest extends AbstractActorTest {
-
- private static final DOMDataTreeIdentifier TEST_ID =
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
-
- private CDSShardAccessImpl shardAccess;
- private ActorUtils context;
-
- @Before
- public void setUp() {
- context = mock(ActorUtils.class);
- final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
- doReturn(Optional.of(getSystem().deadLetters())).when(context).findLocalShard(any());
- doReturn(datastoreContext).when(context).getDatastoreContext();
- doReturn(getSystem()).when(context).getActorSystem();
- shardAccess = new CDSShardAccessImpl(TEST_ID, context);
- }
-
- @Test
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void testRegisterLeaderLocationListener() {
- final LeaderLocationListener listener1 = mock(LeaderLocationListener.class);
-
- // first registration should be OK
- shardAccess.registerLeaderLocationListener(listener1);
-
- // second registration should fail with IllegalArgumentEx
- try {
- shardAccess.registerLeaderLocationListener(listener1);
- fail("Should throw exception");
- } catch (final Exception e) {
- assertTrue(e instanceof IllegalArgumentException);
- }
-
- // null listener registration should fail with NPE
- try {
- shardAccess.registerLeaderLocationListener(null);
- fail("Should throw exception");
- } catch (final Exception e) {
- assertTrue(e instanceof NullPointerException);
- }
-
- // registering listener on closed shard access should fail with IllegalStateEx
- final LeaderLocationListener listener2 = mock(LeaderLocationListener.class);
- shardAccess.close();
- try {
- shardAccess.registerLeaderLocationListener(listener2);
- fail("Should throw exception");
- } catch (final Exception ex) {
- assertTrue(ex instanceof IllegalStateException);
- }
- }
-
- @Test
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void testOnLeaderLocationChanged() {
- final LeaderLocationListener listener1 = mock(LeaderLocationListener.class);
- doThrow(new RuntimeException("Failed")).when(listener1).onLeaderLocationChanged(any());
- final LeaderLocationListener listener2 = mock(LeaderLocationListener.class);
- doNothing().when(listener2).onLeaderLocationChanged(any());
- final LeaderLocationListener listener3 = mock(LeaderLocationListener.class);
- doNothing().when(listener3).onLeaderLocationChanged(any());
-
- final LeaderLocationListenerRegistration<?> reg1 = shardAccess.registerLeaderLocationListener(listener1);
- final LeaderLocationListenerRegistration<?> reg2 = shardAccess.registerLeaderLocationListener(listener2);
- final LeaderLocationListenerRegistration<?> reg3 = shardAccess.registerLeaderLocationListener(listener3);
-
- // Error in listener1 should not affect dispatching change to other listeners
- shardAccess.onLeaderLocationChanged(LeaderLocation.LOCAL);
- verify(listener1).onLeaderLocationChanged(eq(LeaderLocation.LOCAL));
- verify(listener2).onLeaderLocationChanged(eq(LeaderLocation.LOCAL));
- verify(listener3).onLeaderLocationChanged(eq(LeaderLocation.LOCAL));
-
- // Closed listeners shouldn't see new leader location changes
- reg1.close();
- reg2.close();
- shardAccess.onLeaderLocationChanged(LeaderLocation.REMOTE);
- verify(listener3).onLeaderLocationChanged(eq(LeaderLocation.REMOTE));
- verifyNoMoreInteractions(listener1);
- verifyNoMoreInteractions(listener2);
-
- // Closed shard access should not dispatch any new events
- shardAccess.close();
- shardAccess.onLeaderLocationChanged(LeaderLocation.UNKNOWN);
- verifyNoMoreInteractions(listener1);
- verifyNoMoreInteractions(listener2);
- verifyNoMoreInteractions(listener3);
-
- reg3.close();
- }
-
- @Test
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void testGetShardIdentifier() {
- assertEquals(shardAccess.getShardIdentifier(), TEST_ID);
-
- // closed shard access should throw illegal state
- shardAccess.close();
- try {
- shardAccess.getShardIdentifier();
- fail("Exception expected");
- } catch (final Exception e) {
- assertTrue(e instanceof IllegalStateException);
- }
- }
-
- @Test
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void testGetLeaderLocation() {
- // new shard access does not know anything about leader location
- assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.UNKNOWN);
-
- // we start getting leader location changes notifications
- shardAccess.onLeaderLocationChanged(LeaderLocation.LOCAL);
- assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.LOCAL);
-
- shardAccess.onLeaderLocationChanged(LeaderLocation.REMOTE);
- shardAccess.onLeaderLocationChanged(LeaderLocation.UNKNOWN);
- assertEquals(shardAccess.getLeaderLocation(), LeaderLocation.UNKNOWN);
-
- // closed shard access throws illegal state
- shardAccess.close();
- try {
- shardAccess.getLeaderLocation();
- fail("Should have failed with IllegalStateEx");
- } catch (Exception e) {
- assertTrue(e instanceof IllegalStateException);
- }
- }
-
- @Test
- @SuppressWarnings("checkstyle:IllegalCatch")
- public void testMakeLeaderLocal() throws Exception {
- final FiniteDuration timeout = new FiniteDuration(5, TimeUnit.SECONDS);
- final ActorRef localShardRef = mock(ActorRef.class);
- final Future<ActorRef> localShardRefFuture = Futures.successful(localShardRef);
- doReturn(localShardRefFuture).when(context).findLocalShardAsync(any());
-
- // MakeLeaderLocal will reply with success
- doReturn(Futures.successful(null)).when(context).executeOperationAsync((ActorRef) any(), any(), any());
- doReturn(getSystem().dispatcher()).when(context).getClientDispatcher();
- assertEquals(waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout), null);
-
- // MakeLeaderLocal will reply with failure
- doReturn(Futures.failed(new LeadershipTransferFailedException("Failure")))
- .when(context).executeOperationAsync((ActorRef) any(), any(), any());
-
- try {
- waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout);
- fail("makeLeaderLocal operation should not be successful");
- } catch (final Exception e) {
- assertTrue(e instanceof LeadershipTransferFailedException);
- }
-
- // we don't even find local shard
- doReturn(Futures.failed(new LocalShardNotFoundException("Local shard not found")))
- .when(context).findLocalShardAsync(any());
-
- try {
- waitOnAsyncTask(shardAccess.makeLeaderLocal(), timeout);
- fail("makeLeaderLocal operation should not be successful");
- } catch (final Exception e) {
- assertTrue(e instanceof LeadershipTransferFailedException);
- assertTrue(e.getCause() instanceof LocalShardNotFoundException);
- }
-
- // closed shard access should throw IllegalStateEx
- shardAccess.close();
- try {
- shardAccess.makeLeaderLocal();
- fail("Should have thrown IllegalStateEx. ShardAccess is closed");
- } catch (final Exception e) {
- assertTrue(e instanceof IllegalStateException);
- }
- }
-}
\ No newline at end of file
+++ /dev/null
-/*
- * Copyright (c) 2016, 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static org.hamcrest.CoreMatchers.hasItems;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Collections;
-import java.util.List;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
-import org.opendaylight.mdsal.dom.broker.ShardedDOMDataTree;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-
-@Deprecated(forRemoval = true)
-public class DistributedShardFrontendTest {
-
- private static final DOMDataTreeIdentifier ROOT =
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty());
- private static final ListenableFuture<Object> SUCCESS_FUTURE = Futures.immediateFuture(null);
-
- private ShardedDOMDataTree shardedDOMDataTree;
-
- private DataStoreClient client;
- private ClientLocalHistory clientHistory;
- private ClientTransaction clientTransaction;
- private DOMDataTreeWriteCursor cursor;
-
- private static final YangInstanceIdentifier OUTER_LIST_YID = TestModel.OUTER_LIST_PATH.node(
- NodeIdentifierWithPredicates.of(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1));
- private static final DOMDataTreeIdentifier OUTER_LIST_ID =
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, OUTER_LIST_YID);
-
- @Captor
- private ArgumentCaptor<PathArgument> pathArgumentCaptor;
- @Captor
- private ArgumentCaptor<NormalizedNode<?, ?>> nodeCaptor;
-
- private DOMStoreThreePhaseCommitCohort commitCohort;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- shardedDOMDataTree = new ShardedDOMDataTree();
- client = mock(DataStoreClient.class);
- cursor = mock(DOMDataTreeWriteCursor.class);
- clientTransaction = mock(ClientTransaction.class);
- clientHistory = mock(ClientLocalHistory.class);
- commitCohort = mock(DOMStoreThreePhaseCommitCohort.class);
-
- doReturn(SUCCESS_FUTURE).when(commitCohort).canCommit();
- doReturn(SUCCESS_FUTURE).when(commitCohort).preCommit();
- doReturn(SUCCESS_FUTURE).when(commitCohort).commit();
- doReturn(SUCCESS_FUTURE).when(commitCohort).abort();
-
- doReturn(clientTransaction).when(client).createTransaction();
- doReturn(clientTransaction).when(clientHistory).createTransaction();
- doNothing().when(clientHistory).close();
-
- doNothing().when(client).close();
- doReturn(clientHistory).when(client).createLocalHistory();
-
- doReturn(cursor).when(clientTransaction).openCursor();
- doNothing().when(cursor).close();
- doNothing().when(cursor).write(any(), any());
- doNothing().when(cursor).merge(any(), any());
- doNothing().when(cursor).delete(any());
-
- doReturn(commitCohort).when(clientTransaction).ready();
- }
-
- @Test
- public void testClientTransaction() throws Exception {
- final DistributedDataStore distributedDataStore = mock(DistributedDataStore.class);
- final ActorUtils context = mock(ActorUtils.class);
- doReturn(context).when(distributedDataStore).getActorUtils();
- doReturn(SchemaContextHelper.full()).when(context).getSchemaContext();
-
- final DistributedShardFrontend rootShard = new DistributedShardFrontend(distributedDataStore, client, ROOT);
-
- try (DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT))) {
- shardedDOMDataTree.registerDataTreeShard(ROOT, rootShard, producer);
- }
-
- final DataStoreClient outerListClient = mock(DataStoreClient.class);
- final ClientTransaction outerListClientTransaction = mock(ClientTransaction.class);
- final ClientLocalHistory outerListClientHistory = mock(ClientLocalHistory.class);
- final DOMDataTreeWriteCursor outerListCursor = mock(DOMDataTreeWriteCursor.class);
-
- doNothing().when(outerListCursor).close();
- doNothing().when(outerListCursor).write(any(), any());
- doNothing().when(outerListCursor).merge(any(), any());
- doNothing().when(outerListCursor).delete(any());
-
- doReturn(outerListCursor).when(outerListClientTransaction).openCursor();
- doReturn(outerListClientTransaction).when(outerListClient).createTransaction();
- doReturn(outerListClientHistory).when(outerListClient).createLocalHistory();
- doReturn(outerListClientTransaction).when(outerListClientHistory).createTransaction();
-
- doReturn(commitCohort).when(outerListClientTransaction).ready();
-
- doNothing().when(outerListClientHistory).close();
- doNothing().when(outerListClient).close();
-
- final DistributedShardFrontend outerListShard = new DistributedShardFrontend(
- distributedDataStore, outerListClient, OUTER_LIST_ID);
- try (DOMDataTreeProducer producer =
- shardedDOMDataTree.createProducer(Collections.singletonList(OUTER_LIST_ID))) {
- shardedDOMDataTree.registerDataTreeShard(OUTER_LIST_ID, outerListShard, producer);
- }
-
- final DOMDataTreeProducer producer = shardedDOMDataTree.createProducer(Collections.singletonList(ROOT));
- final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false);
- final DOMDataTreeWriteCursor txCursor = tx.createCursor(ROOT);
-
- assertNotNull(txCursor);
- txCursor.write(TestModel.TEST_PATH.getLastPathArgument(), createCrossShardContainer());
-
- //check the lower shard got the correct modification
- verify(outerListCursor, times(2)).write(pathArgumentCaptor.capture(), nodeCaptor.capture());
-
- final List<PathArgument> capturedArgs = pathArgumentCaptor.getAllValues();
- assertEquals(2, capturedArgs.size());
- assertThat(capturedArgs,
- hasItems(new NodeIdentifier(TestModel.ID_QNAME), new NodeIdentifier(TestModel.INNER_LIST_QNAME)));
-
- final List<NormalizedNode<?, ?>> capturedValues = nodeCaptor.getAllValues();
- assertEquals(2, capturedValues.size());
- assertThat(capturedValues,
- hasItems(ImmutableNodes.leafNode(TestModel.ID_QNAME, 1), createInnerMapNode(1)));
-
- txCursor.close();
- tx.commit().get();
-
- verify(commitCohort, times(2)).canCommit();
- verify(commitCohort, times(2)).preCommit();
- verify(commitCohort, times(2)).commit();
- }
-
- private static MapNode createInnerMapNode(final int id) {
- final MapEntryNode listEntry = ImmutableNodes
- .mapEntryBuilder(TestModel.INNER_LIST_QNAME, TestModel.NAME_QNAME, "name-" + id)
- .withChild(ImmutableNodes.leafNode(TestModel.NAME_QNAME, "name-" + id))
- .withChild(ImmutableNodes.leafNode(TestModel.VALUE_QNAME, "value-" + id))
- .build();
-
- return ImmutableNodes.mapNodeBuilder(TestModel.INNER_LIST_QNAME).withChild(listEntry).build();
- }
-
- private static ContainerNode createCrossShardContainer() {
-
- final MapEntryNode outerListEntry1 =
- ImmutableNodes.mapEntryBuilder(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1)
- .withChild(createInnerMapNode(1))
- .build();
- final MapEntryNode outerListEntry2 =
- ImmutableNodes.mapEntryBuilder(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2)
- .withChild(createInnerMapNode(2))
- .build();
-
- final MapNode outerList = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME)
- .withChild(outerListEntry1)
- .withChild(outerListEntry2)
- .build();
-
- final ContainerNode testContainer = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
- .withChild(outerList)
- .build();
-
- return testContainer;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.sharding;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
-import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Address;
-import akka.actor.AddressFromURIString;
-import akka.cluster.Cluster;
-import akka.testkit.javadsl.TestKit;
-import com.google.common.collect.Lists;
-import com.typesafe.config.ConfigFactory;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.opendaylight.controller.cluster.ActorSystemProvider;
-import org.opendaylight.controller.cluster.datastore.AbstractTest;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
-import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
-import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(forRemoval = true)
-public class DistributedShardedDOMDataTreeRemotingTest extends AbstractTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
-
- private static final Address MEMBER_1_ADDRESS =
- AddressFromURIString.parse("akka://cluster-test@127.0.0.1:2558");
-
- private static final DOMDataTreeIdentifier TEST_ID =
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
-
- private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
-
- private ActorSystem leaderSystem;
- private ActorSystem followerSystem;
-
-
- private final Builder leaderDatastoreContextBuilder =
- DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
-
- private final DatastoreContext.Builder followerDatastoreContextBuilder =
- DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
-
- private DistributedDataStore leaderConfigDatastore;
- private DistributedDataStore leaderOperDatastore;
-
- private DistributedDataStore followerConfigDatastore;
- private DistributedDataStore followerOperDatastore;
-
-
- private IntegrationTestKit followerTestKit;
- private IntegrationTestKit leaderTestKit;
- private DistributedShardedDOMDataTree leaderShardFactory;
-
- private DistributedShardedDOMDataTree followerShardFactory;
- private ActorSystemProvider leaderSystemProvider;
- private ActorSystemProvider followerSystemProvider;
-
- @Before
- public void setUp() {
- InMemoryJournal.clear();
- InMemorySnapshotStore.clear();
-
- leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
- Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
-
- followerSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
- Cluster.get(followerSystem).join(MEMBER_1_ADDRESS);
-
- leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
- doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
-
- followerSystemProvider = Mockito.mock(ActorSystemProvider.class);
- doReturn(followerSystem).when(followerSystemProvider).getActorSystem();
-
- }
-
- @After
- public void tearDown() {
- if (leaderConfigDatastore != null) {
- leaderConfigDatastore.close();
- }
- if (leaderOperDatastore != null) {
- leaderOperDatastore.close();
- }
-
- if (followerConfigDatastore != null) {
- followerConfigDatastore.close();
- }
- if (followerOperDatastore != null) {
- followerOperDatastore.close();
- }
-
- TestKit.shutdownActorSystem(leaderSystem, true);
- TestKit.shutdownActorSystem(followerSystem, true);
-
- InMemoryJournal.clear();
- InMemorySnapshotStore.clear();
- }
-
- private void initEmptyDatastores() throws Exception {
- initEmptyDatastores(MODULE_SHARDS_CONFIG);
- }
-
- private void initEmptyDatastores(final String moduleShardsConfig) throws Exception {
- leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
-
- leaderConfigDatastore = leaderTestKit.setupDistributedDataStore(
- "config", moduleShardsConfig, true,
- SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
- leaderOperDatastore = leaderTestKit.setupDistributedDataStore(
- "operational", moduleShardsConfig, true,
- SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
-
- leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
- leaderOperDatastore,
- leaderConfigDatastore);
-
- followerTestKit = new IntegrationTestKit(followerSystem, followerDatastoreContextBuilder);
-
- followerConfigDatastore = followerTestKit.setupDistributedDataStore(
- "config", moduleShardsConfig, true, SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
- followerOperDatastore = followerTestKit.setupDistributedDataStore(
- "operational", moduleShardsConfig, true,
- SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
-
- followerShardFactory = new DistributedShardedDOMDataTree(followerSystemProvider,
- followerOperDatastore,
- followerConfigDatastore);
-
- followerTestKit.waitForMembersUp("member-1");
-
- LOG.info("Initializing leader DistributedShardedDOMDataTree");
- leaderShardFactory.init();
-
- leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(YangInstanceIdentifier.empty()));
-
- leaderTestKit.waitUntilLeader(leaderOperDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(YangInstanceIdentifier.empty()));
-
- LOG.info("Initializing follower DistributedShardedDOMDataTree");
- followerShardFactory.init();
- }
-
- @Test
- public void testProducerRegistrations() throws Exception {
- LOG.info("testProducerRegistrations starting");
- initEmptyDatastores();
-
- leaderTestKit.waitForMembersUp("member-2");
-
- // TODO refactor shard creation and verification to own method
- final DistributedShardRegistration shardRegistration =
- waitOnAsyncTask(leaderShardFactory.createDistributedShard(
- TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
-
- final ActorRef leaderShardManager = leaderConfigDatastore.getActorUtils().getShardManager();
-
- assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
-
- assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier())));
-
- final Set<String> peers = new HashSet<>();
- IntegrationTestKit.verifyShardState(leaderConfigDatastore,
- ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
- peers.addAll(onDemandShardState.getPeerAddresses().values()));
- assertEquals(peers.size(), 1);
-
- final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
- try {
- followerShardFactory.createProducer(Collections.singleton(TEST_ID));
- fail("Producer should be already registered on the other node");
- } catch (final IllegalArgumentException e) {
- assertTrue(e.getMessage().contains("is attached to producer"));
- }
-
- producer.close();
-
- final DOMDataTreeProducer followerProducer =
- followerShardFactory.createProducer(Collections.singleton(TEST_ID));
- try {
- leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
- fail("Producer should be already registered on the other node");
- } catch (final IllegalArgumentException e) {
- assertTrue(e.getMessage().contains("is attached to producer"));
- }
-
- followerProducer.close();
- // try to create a shard on an already registered prefix on follower
- try {
- waitOnAsyncTask(followerShardFactory.createDistributedShard(
- TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- fail("This prefix already should have a shard registration that was forwarded from the other node");
- } catch (final DOMDataTreeShardingConflictException e) {
- assertTrue(e.getMessage().contains("is already occupied by another shard"));
- }
-
- shardRegistration.close().toCompletableFuture().get();
-
- LOG.info("testProducerRegistrations ending");
- }
-
- @Test
- public void testWriteIntoMultipleShards() throws Exception {
- LOG.info("testWriteIntoMultipleShards starting");
- initEmptyDatastores();
-
- leaderTestKit.waitForMembersUp("member-2");
-
- LOG.debug("registering first shard");
- final DistributedShardRegistration shardRegistration =
- waitOnAsyncTask(leaderShardFactory.createDistributedShard(
- TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
-
- leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
- findLocalShard(followerConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
-
- final Set<String> peers = new HashSet<>();
- IntegrationTestKit.verifyShardState(leaderConfigDatastore,
- ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
- peers.addAll(onDemandShardState.getPeerAddresses().values()));
- assertEquals(peers.size(), 1);
-
- LOG.debug("Got after waiting for nonleader");
- final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
-
- final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
- final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
- Assert.assertNotNull(cursor);
- final YangInstanceIdentifier nameId =
- YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
- cursor.write(nameId.getLastPathArgument(),
- ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
- new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build());
-
- cursor.close();
- LOG.warn("Got to pre submit");
-
- tx.commit().get();
-
- shardRegistration.close().toCompletableFuture().get();
-
- LOG.info("testWriteIntoMultipleShards ending");
- }
-
- @Test
- public void testMultipleShardRegistrations() throws Exception {
- LOG.info("testMultipleShardRegistrations starting");
- initEmptyDatastores();
-
- final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
- TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- final DistributedShardRegistration reg2 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.OUTER_CONTAINER_PATH),
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- final DistributedShardRegistration reg3 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.INNER_LIST_PATH),
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- final DistributedShardRegistration reg4 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.JUNK_PATH),
- Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
- leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
- leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
-
- // check leader has local shards
- assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
-
- assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
-
- assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
-
- assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
-
- // check follower has local shards
- assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
-
- assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH)));
-
- assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH)));
-
- assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.JUNK_PATH)));
-
- LOG.debug("Closing registrations");
-
- reg1.close().toCompletableFuture().get();
- reg2.close().toCompletableFuture().get();
- reg3.close().toCompletableFuture().get();
- reg4.close().toCompletableFuture().get();
-
- waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
-
- waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
-
- waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
-
- waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
-
- LOG.debug("All leader shards gone");
-
- waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
-
- waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.OUTER_CONTAINER_PATH));
-
- waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.INNER_LIST_PATH));
-
- waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.JUNK_PATH));
-
- LOG.debug("All follower shards gone");
- LOG.info("testMultipleShardRegistrations ending");
- }
-
- @Test
- public void testMultipleRegistrationsAtOnePrefix() throws Exception {
- LOG.info("testMultipleRegistrationsAtOnePrefix starting");
- initEmptyDatastores();
-
- for (int i = 0; i < 5; i++) {
- LOG.info("Round {}", i);
- final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
- TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME, AbstractTest.MEMBER_2_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- leaderTestKit.waitUntilLeader(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
-
- assertNotNull(findLocalShard(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
-
- assertNotNull(findLocalShard(followerConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
-
-
- final Set<String> peers = new HashSet<>();
- IntegrationTestKit.verifyShardState(leaderConfigDatastore,
- ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()), onDemandShardState ->
- peers.addAll(onDemandShardState.getPeerAddresses().values()));
- assertEquals(peers.size(), 1);
-
- waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- waitUntilShardIsDown(leaderConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
-
- waitUntilShardIsDown(followerConfigDatastore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- }
-
- LOG.info("testMultipleRegistrationsAtOnePrefix ending");
- }
-
- @Test
- public void testInitialBootstrappingWithNoModuleShards() throws Exception {
- LOG.info("testInitialBootstrappingWithNoModuleShards starting");
- initEmptyDatastores("module-shards-default-member-1.conf");
-
- // We just verify the DistributedShardedDOMDataTree initialized without error.
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.cluster.sharding;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.anyCollection;
-import static org.mockito.ArgumentMatchers.anyMap;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.findLocalShard;
-import static org.opendaylight.controller.cluster.datastore.IntegrationTestKit.waitUntilShardIsDown;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Address;
-import akka.actor.AddressFromURIString;
-import akka.actor.Props;
-import akka.cluster.Cluster;
-import akka.testkit.javadsl.TestKit;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.typesafe.config.ConfigFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.TimeUnit;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-import org.opendaylight.controller.cluster.ActorSystemProvider;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
-import org.opendaylight.controller.cluster.datastore.AbstractTest;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
-import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
-import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer;
-import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
-import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
-import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
-import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableLeafNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableMapNodeBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(forRemoval = true)
-public class DistributedShardedDOMDataTreeTest extends AbstractTest {
-
- private static final Logger LOG = LoggerFactory.getLogger(DistributedShardedDOMDataTreeRemotingTest.class);
-
- private static final Address MEMBER_1_ADDRESS =
- AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
-
- private static final DOMDataTreeIdentifier TEST_ID =
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, TestModel.TEST_PATH);
-
- private static final DOMDataTreeIdentifier INNER_LIST_ID =
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
- YangInstanceIdentifier.create(getOuterListIdFor(0).getPathArguments())
- .node(TestModel.INNER_LIST_QNAME));
- private static final Set<MemberName> SINGLE_MEMBER = Collections.singleton(AbstractTest.MEMBER_NAME);
-
- private static final String MODULE_SHARDS_CONFIG = "module-shards-default-member-1.conf";
-
- private ActorSystem leaderSystem;
-
- private final Builder leaderDatastoreContextBuilder =
- DatastoreContext.newBuilder()
- .shardHeartbeatIntervalInMillis(100)
- .shardElectionTimeoutFactor(2)
- .logicalStoreType(LogicalDatastoreType.CONFIGURATION);
-
- private DistributedDataStore leaderDistributedDataStore;
- private DistributedDataStore operDistributedDatastore;
- private IntegrationTestKit leaderTestKit;
-
- private DistributedShardedDOMDataTree leaderShardFactory;
-
- @Captor
- private ArgumentCaptor<Collection<DataTreeCandidate>> captorForChanges;
- @Captor
- private ArgumentCaptor<Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>>> captorForSubtrees;
-
- private ActorSystemProvider leaderSystemProvider;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- InMemoryJournal.clear();
- InMemorySnapshotStore.clear();
-
- leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
- Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
-
- leaderSystemProvider = Mockito.mock(ActorSystemProvider.class);
- doReturn(leaderSystem).when(leaderSystemProvider).getActorSystem();
- }
-
- @After
- public void tearDown() {
- if (leaderDistributedDataStore != null) {
- leaderDistributedDataStore.close();
- }
-
- if (operDistributedDatastore != null) {
- operDistributedDatastore.close();
- }
-
- TestKit.shutdownActorSystem(leaderSystem);
-
- InMemoryJournal.clear();
- InMemorySnapshotStore.clear();
- }
-
- private void initEmptyDatastores() throws Exception {
- leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
-
- leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(
- "config", MODULE_SHARDS_CONFIG, "empty-modules.conf", true,
- SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
-
- operDistributedDatastore = leaderTestKit.setupDistributedDataStore(
- "operational", MODULE_SHARDS_CONFIG, "empty-modules.conf",true,
- SchemaContextHelper.distributedShardedDOMDataTreeSchemaContext());
-
- leaderShardFactory = new DistributedShardedDOMDataTree(leaderSystemProvider,
- operDistributedDatastore,
- leaderDistributedDataStore);
-
- leaderShardFactory.init();
- }
-
-
- @Test
- public void testWritesIntoDefaultShard() throws Exception {
- initEmptyDatastores();
-
- final DOMDataTreeIdentifier configRoot =
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty());
-
- final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(configRoot));
-
- final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
- final DOMDataTreeWriteCursor cursor =
- tx.createCursor(new DOMDataTreeIdentifier(
- LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty()));
- Assert.assertNotNull(cursor);
-
- final ContainerNode test =
- ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)).build();
-
- cursor.write(test.getIdentifier(), test);
- cursor.close();
-
- tx.commit().get();
- }
-
- @Test
- public void testSingleNodeWritesAndRead() throws Exception {
- initEmptyDatastores();
-
- final DistributedShardRegistration shardRegistration = waitOnAsyncTask(
- leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(),
- ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
-
- final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(TEST_ID));
-
- final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(true);
- final DOMDataTreeWriteCursor cursor = tx.createCursor(TEST_ID);
- Assert.assertNotNull(cursor);
- final YangInstanceIdentifier nameId =
- YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.NAME_QNAME).build();
- final LeafNode<String> valueToCheck = ImmutableLeafNodeBuilder.<String>create().withNodeIdentifier(
- new NodeIdentifier(TestModel.NAME_QNAME)).withValue("Test Value").build();
- LOG.debug("Writing data {} at {}, cursor {}", nameId.getLastPathArgument(), valueToCheck, cursor);
- cursor.write(nameId.getLastPathArgument(),
- valueToCheck);
-
- cursor.close();
- LOG.debug("Got to pre submit");
-
- tx.commit().get();
-
- final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class);
- doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap());
-
- leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(TEST_ID),
- true, Collections.emptyList());
-
- verify(mockedDataTreeListener, timeout(1000).times(1)).onDataTreeChanged(captorForChanges.capture(),
- captorForSubtrees.capture());
- final List<Collection<DataTreeCandidate>> capturedValue = captorForChanges.getAllValues();
-
- final Optional<NormalizedNode<?, ?>> dataAfter =
- capturedValue.get(0).iterator().next().getRootNode().getDataAfter();
-
- final NormalizedNode<?,?> expected = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME)).withChild(valueToCheck).build();
- assertEquals(expected, dataAfter.get());
-
- verifyNoMoreInteractions(mockedDataTreeListener);
-
- final String shardName = ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier());
- LOG.debug("Creating distributed datastore client for shard {}", shardName);
-
- final ActorUtils actorUtils = leaderDistributedDataStore.getActorUtils();
- final Props distributedDataStoreClientProps =
- SimpleDataStoreClientActor.props(actorUtils.getCurrentMemberName(), "Shard-" + shardName, actorUtils,
- shardName);
-
- final ActorRef clientActor = leaderSystem.actorOf(distributedDataStoreClientProps);
- final DataStoreClient distributedDataStoreClient = SimpleDataStoreClientActor
- .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
-
- final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
- final ClientTransaction tx2 = localHistory.createTransaction();
- final FluentFuture<Optional<NormalizedNode<?, ?>>> read = tx2.read(YangInstanceIdentifier.empty());
-
- final Optional<NormalizedNode<?, ?>> optional = read.get();
- tx2.abort();
- localHistory.close();
-
- shardRegistration.close().toCompletableFuture().get();
-
- }
-
- @Test
- public void testMultipleWritesIntoSingleMapEntry() throws Exception {
- initEmptyDatastores();
-
- final DistributedShardRegistration shardRegistration = waitOnAsyncTask(
- leaderShardFactory.createDistributedShard(TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(),
- ClusterUtils.getCleanShardName(TEST_ID.getRootIdentifier()));
-
- LOG.warn("Got after waiting for nonleader");
- final ActorRef leaderShardManager = leaderDistributedDataStore.getActorUtils().getShardManager();
-
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
-
- final YangInstanceIdentifier oid1 = getOuterListIdFor(0);
- final DOMDataTreeIdentifier outerListPath = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1);
-
- final DistributedShardRegistration outerListShardReg = waitOnAsyncTask(
- leaderShardFactory.createDistributedShard(outerListPath, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(),
- ClusterUtils.getCleanShardName(outerListPath.getRootIdentifier()));
-
- final DOMDataTreeProducer shardProducer = leaderShardFactory.createProducer(
- Collections.singletonList(outerListPath));
-
- final DOMDataTreeCursorAwareTransaction tx = shardProducer.createTransaction(false);
- final DOMDataTreeWriteCursor cursor =
- tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, oid1));
- assertNotNull(cursor);
-
- MapNode innerList = ImmutableMapNodeBuilder
- .create()
- .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME))
- .build();
-
- cursor.write(new NodeIdentifier(TestModel.INNER_LIST_QNAME), innerList);
- cursor.close();
- tx.commit().get();
-
- final ArrayList<ListenableFuture<?>> futures = new ArrayList<>();
- for (int i = 0; i < 1000; i++) {
- final Collection<MapEntryNode> innerListMapEntries = createInnerListMapEntries(1000, "run-" + i);
- for (final MapEntryNode innerListMapEntry : innerListMapEntries) {
- final DOMDataTreeCursorAwareTransaction tx1 = shardProducer.createTransaction(false);
- final DOMDataTreeWriteCursor cursor1 = tx1.createCursor(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
- oid1.node(new NodeIdentifier(TestModel.INNER_LIST_QNAME))));
- cursor1.write(innerListMapEntry.getIdentifier(), innerListMapEntry);
- cursor1.close();
- futures.add(tx1.commit());
- }
- }
-
- futures.get(futures.size() - 1).get();
-
- final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class);
- doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap());
-
- leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(INNER_LIST_ID),
- true, Collections.emptyList());
-
- verify(mockedDataTreeListener, timeout(1000).times(1)).onDataTreeChanged(captorForChanges.capture(),
- captorForSubtrees.capture());
- verifyNoMoreInteractions(mockedDataTreeListener);
- final List<Collection<DataTreeCandidate>> capturedValue = captorForChanges.getAllValues();
-
- final NormalizedNode<?,?> expected =
- ImmutableMapNodeBuilder
- .create()
- .withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME))
- // only the values from the last run should be present
- .withValue(createInnerListMapEntries(1000, "run-999"))
- .build();
-
- assertEquals("List values dont match the expected values from the last run",
- expected, capturedValue.get(0).iterator().next().getRootNode().getDataAfter().get());
-
- }
-
- // top level shard at TEST element, with subshards on each outer-list map entry
- @Test
- @Ignore
- public void testMultipleShardLevels() throws Exception {
- initEmptyDatastores();
-
- final DistributedShardRegistration testShardReg = waitOnAsyncTask(
- leaderShardFactory.createDistributedShard(TEST_ID, SINGLE_MEMBER),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- final ArrayList<DistributedShardRegistration> registrations = new ArrayList<>();
- final int listSize = 5;
- for (int i = 0; i < listSize; i++) {
- final YangInstanceIdentifier entryYID = getOuterListIdFor(i);
- final CompletionStage<DistributedShardRegistration> future = leaderShardFactory.createDistributedShard(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, entryYID), SINGLE_MEMBER);
-
- registrations.add(waitOnAsyncTask(future, DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION));
- }
-
- final DOMDataTreeIdentifier rootId =
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty());
- final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singletonList(
- rootId));
-
- DOMDataTreeCursorAwareTransaction transaction = producer.createTransaction(false);
-
- DOMDataTreeWriteCursor cursor = transaction.createCursor(rootId);
- assertNotNull(cursor);
-
- final MapNode outerList =
- ImmutableMapNodeBuilder.create()
- .withNodeIdentifier(new NodeIdentifier(TestModel.OUTER_LIST_QNAME)).build();
-
- final ContainerNode testNode =
- ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
- .withChild(outerList)
- .build();
-
- cursor.write(testNode.getIdentifier(), testNode);
-
- cursor.close();
- transaction.commit().get();
-
- final DOMDataTreeListener mockedDataTreeListener = mock(DOMDataTreeListener.class);
- doNothing().when(mockedDataTreeListener).onDataTreeChanged(anyCollection(), anyMap());
-
- final MapNode wholeList = ImmutableMapNodeBuilder.create(outerList)
- .withValue(createOuterEntries(listSize, "testing-values")).build();
-
- transaction = producer.createTransaction(false);
- cursor = transaction.createCursor(TEST_ID);
- assertNotNull(cursor);
-
- cursor.write(wholeList.getIdentifier(), wholeList);
- cursor.close();
-
- transaction.commit().get();
-
- leaderShardFactory.registerListener(mockedDataTreeListener, Collections.singletonList(TEST_ID),
- true, Collections.emptyList());
-
- verify(mockedDataTreeListener, timeout(35000).atLeast(2)).onDataTreeChanged(captorForChanges.capture(),
- captorForSubtrees.capture());
- verifyNoMoreInteractions(mockedDataTreeListener);
- final List<Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>>> allSubtrees = captorForSubtrees.getAllValues();
-
- final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> lastSubtree = allSubtrees.get(allSubtrees.size() - 1);
-
- final NormalizedNode<?, ?> actual = lastSubtree.get(TEST_ID);
- assertNotNull(actual);
-
- final NormalizedNode<?, ?> expected =
- ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
- .withChild(ImmutableMapNodeBuilder.create(outerList)
- .withValue(createOuterEntries(listSize, "testing-values")).build())
- .build();
-
-
- for (final DistributedShardRegistration registration : registrations) {
- waitOnAsyncTask(registration.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- }
-
- waitOnAsyncTask(testShardReg.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void testMultipleRegistrationsAtOnePrefix() throws Exception {
- initEmptyDatastores();
-
- for (int i = 0; i < 10; i++) {
- LOG.debug("Round {}", i);
- final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
- TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
-
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
-
- waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- waitUntilShardIsDown(leaderDistributedDataStore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
- }
- }
-
- @Test
- public void testCDSDataTreeProducer() throws Exception {
- initEmptyDatastores();
-
- final DistributedShardRegistration reg1 = waitOnAsyncTask(leaderShardFactory.createDistributedShard(
- TEST_ID, Lists.newArrayList(AbstractTest.MEMBER_NAME)),
- DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
-
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH));
-
- assertNotNull(findLocalShard(leaderDistributedDataStore.getActorUtils(),
- ClusterUtils.getCleanShardName(TestModel.TEST_PATH)));
-
-
- final DOMDataTreeIdentifier configRoot =
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty());
- final DOMDataTreeProducer producer = leaderShardFactory.createProducer(Collections.singleton(configRoot));
-
- assertTrue(producer instanceof CDSDataTreeProducer);
-
- final CDSDataTreeProducer cdsProducer = (CDSDataTreeProducer) producer;
- CDSShardAccess shardAccess = cdsProducer.getShardAccess(TEST_ID);
- assertEquals(shardAccess.getShardIdentifier(), TEST_ID);
-
- shardAccess = cdsProducer.getShardAccess(INNER_LIST_ID);
- assertEquals(TEST_ID, shardAccess.getShardIdentifier());
-
- shardAccess = cdsProducer.getShardAccess(configRoot);
- assertEquals(configRoot, shardAccess.getShardIdentifier());
-
- waitOnAsyncTask(reg1.close(), DistributedShardedDOMDataTree.SHARD_FUTURE_TIMEOUT_DURATION);
- }
-
- private static Collection<MapEntryNode> createOuterEntries(final int amount, final String valuePrefix) {
- final Collection<MapEntryNode> ret = new ArrayList<>();
- for (int i = 0; i < amount; i++) {
- ret.add(ImmutableNodes.mapEntryBuilder()
- .withNodeIdentifier(NodeIdentifierWithPredicates.of(TestModel.OUTER_LIST_QNAME,
- QName.create(TestModel.OUTER_LIST_QNAME, "id"), i))
- .withChild(ImmutableNodes
- .leafNode(QName.create(TestModel.OUTER_LIST_QNAME, "id"), i))
- .withChild(createWholeInnerList(amount, "outer id: " + i + " " + valuePrefix))
- .build());
- }
-
- return ret;
- }
-
- private static MapNode createWholeInnerList(final int amount, final String valuePrefix) {
- return ImmutableMapNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(TestModel.INNER_LIST_QNAME))
- .withValue(createInnerListMapEntries(amount, valuePrefix)).build();
- }
-
- private static Collection<MapEntryNode> createInnerListMapEntries(final int amount, final String valuePrefix) {
- final Collection<MapEntryNode> ret = new ArrayList<>();
- for (int i = 0; i < amount; i++) {
- ret.add(ImmutableNodes.mapEntryBuilder()
- .withNodeIdentifier(NodeIdentifierWithPredicates.of(TestModel.INNER_LIST_QNAME,
- QName.create(TestModel.INNER_LIST_QNAME, "name"), Integer.toString(i)))
- .withChild(ImmutableNodes
- .leafNode(QName.create(TestModel.INNER_LIST_QNAME, "value"), valuePrefix + "-" + i))
- .build());
- }
-
- return ret;
- }
-
- private static YangInstanceIdentifier getOuterListIdFor(final int id) {
- return TestModel.OUTER_LIST_PATH.node(NodeIdentifierWithPredicates.of(
- TestModel.OUTER_LIST_QNAME, QName.create(TestModel.OUTER_LIST_QNAME, "id"), id));
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.sharding;
-
-import static akka.actor.ActorRef.noSender;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-
-import akka.actor.ActorRef;
-import akka.actor.Props;
-import akka.testkit.javadsl.TestKit;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
-import org.opendaylight.controller.cluster.dom.api.LeaderLocation;
-import org.opendaylight.controller.cluster.dom.api.LeaderLocationListener;
-import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
-import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
-
-@Deprecated(forRemoval = true)
-public class RoleChangeListenerActorTest extends AbstractActorTest {
-
- @Test
- public void testRegisterRoleChangeListenerOnStart() {
- final TestKit testKit = new TestKit(getSystem());
- final LeaderLocationListener listener = mock(LeaderLocationListener.class);
- final Props props = RoleChangeListenerActor.props(testKit.getRef(), listener);
-
- getSystem().actorOf(props, "testRegisterRoleChangeListenerOnStart");
- testKit.expectMsgClass(RegisterRoleChangeListener.class);
- }
-
- @Test
- public void testOnDataTreeChanged() {
- final LeaderLocationListener listener = mock(LeaderLocationListener.class);
- doNothing().when(listener).onLeaderLocationChanged(any());
- final Props props = RoleChangeListenerActor.props(getSystem().deadLetters(), listener);
-
- final ActorRef subject = getSystem().actorOf(props, "testDataTreeChangedChanged");
-
- subject.tell(new LeaderStateChanged("member-1", null, (short) 0), noSender());
- verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.UNKNOWN));
-
- subject.tell(new LeaderStateChanged("member-1", "member-1", (short) 0), noSender());
- verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.LOCAL));
-
- subject.tell(new LeaderStateChanged("member-1", "member-2", (short) 0), noSender());
- verify(listener, timeout(5000)).onLeaderLocationChanged(eq(LeaderLocation.REMOTE));
- }
-}
\ No newline at end of file
*/
package org.opendaylight.controller.clustering.it.provider;
-import static akka.actor.ActorRef.noSender;
-
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
import akka.dispatch.OnComplete;
import akka.pattern.Patterns;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.opendaylight.controller.cluster.ActorSystemProvider;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.databroker.actors.dds.SimpleDataStoreClientActor;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.datastore.utils.ClusterUtils;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
-import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
import org.opendaylight.controller.clustering.it.provider.impl.FlappingSingletonService;
import org.opendaylight.controller.clustering.it.provider.impl.GetConstantService;
-import org.opendaylight.controller.clustering.it.provider.impl.IdIntsDOMDataTreeLIstener;
import org.opendaylight.controller.clustering.it.provider.impl.IdIntsListener;
-import org.opendaylight.controller.clustering.it.provider.impl.PrefixLeaderHandler;
-import org.opendaylight.controller.clustering.it.provider.impl.PrefixShardHandler;
-import org.opendaylight.controller.clustering.it.provider.impl.ProduceTransactionsHandler;
import org.opendaylight.controller.clustering.it.provider.impl.PublishNotificationsTask;
import org.opendaylight.controller.clustering.it.provider.impl.RoutedGetConstantService;
import org.opendaylight.controller.clustering.it.provider.impl.SingletonGetConstantService;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeService;
import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeLoopException;
import org.opendaylight.mdsal.dom.api.DOMDataTreeReadTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
import org.opendaylight.mdsal.dom.api.DOMRpcImplementationRegistration;
import org.opendaylight.mdsal.dom.api.DOMRpcProviderService;
import org.opendaylight.mdsal.dom.api.DOMSchemaService;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.StartPublishNotificationsOutputBuilder;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDdtlOutputBuilder;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.SubscribeDtclOutputBuilder;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnregisterSingletonConstantOutputBuilder;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDdtlOutputBuilder;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclInput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutput;
import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.UnsubscribeDtclOutputBuilder;
private final RpcProviderService rpcRegistry;
private final ObjectRegistration<OdlMdsalLowlevelControlService> registration;
- private final DistributedShardFactory distributedShardFactory;
private final DistributedDataStoreInterface configDataStore;
- private final DOMDataTreeService domDataTreeService;
private final BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
private final DOMDataBroker domDataBroker;
private final NotificationPublishService notificationPublishService;
private final DOMSchemaService schemaService;
private final ClusterSingletonServiceProvider singletonService;
private final DOMRpcProviderService domRpcService;
- private final PrefixLeaderHandler prefixLeaderHandler;
- private final PrefixShardHandler prefixShardHandler;
private final DOMDataTreeChangeService domDataTreeChangeService;
private final ActorSystem actorSystem;
private ListenerRegistration<DOMDataTreeChangeListener> dtclReg;
private IdIntsListener idIntsListener;
private final Map<String, PublishNotificationsTask> publishNotificationsTasks = new HashMap<>();
- private ListenerRegistration<IdIntsDOMDataTreeLIstener> ddtlReg;
- private IdIntsDOMDataTreeLIstener idIntsDdtl;
-
-
public MdsalLowLevelTestProvider(final RpcProviderService rpcRegistry,
final DOMRpcProviderService domRpcService,
final NotificationPublishService notificationPublishService,
final NotificationService notificationService,
final DOMDataBroker domDataBroker,
- final DOMDataTreeService domDataTreeService,
- final DistributedShardFactory distributedShardFactory,
final DistributedDataStoreInterface configDataStore,
final ActorSystemProvider actorSystemProvider) {
this.rpcRegistry = rpcRegistry;
this.notificationPublishService = notificationPublishService;
this.notificationService = notificationService;
this.domDataBroker = domDataBroker;
- this.domDataTreeService = domDataTreeService;
- this.distributedShardFactory = distributedShardFactory;
this.configDataStore = configDataStore;
this.actorSystem = actorSystemProvider.getActorSystem();
- this.prefixLeaderHandler = new PrefixLeaderHandler(domDataTreeService, bindingNormalizedNodeSerializer);
domDataTreeChangeService = domDataBroker.getExtensions().getInstance(DOMDataTreeChangeService.class);
registration = rpcRegistry.registerRpcImplementation(OdlMdsalLowlevelControlService.class, this);
-
- prefixShardHandler = new PrefixShardHandler(distributedShardFactory, domDataTreeService,
- bindingNormalizedNodeSerializer);
}
@Override
@Override
public ListenableFuture<RpcResult<RemovePrefixShardOutput>> removePrefixShard(final RemovePrefixShardInput input) {
- LOG.info("In removePrefixShard - input: {}", input);
-
- return prefixShardHandler.onRemovePrefixShard(input);
+ throw new UnsupportedOperationException();
}
@Override
public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> becomePrefixLeader(
final BecomePrefixLeaderInput input) {
- LOG.info("n becomePrefixLeader - input: {}", input);
-
- return prefixLeaderHandler.makeLeaderLocal(input);
+ throw new UnsupportedOperationException();
}
@Override
@Override
public ListenableFuture<RpcResult<AddShardReplicaOutput>> addShardReplica(final AddShardReplicaInput input) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
public ListenableFuture<RpcResult<SubscribeDdtlOutput>> subscribeDdtl(final SubscribeDdtlInput input) {
- LOG.info("In subscribeDdtl");
-
- if (ddtlReg != null) {
- return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(ErrorType.RPC,
- "data-exists", "There is already a listener registered for id-ints").buildFuture();
- }
-
- idIntsDdtl = new IdIntsDOMDataTreeLIstener();
-
- try {
- ddtlReg = domDataTreeService.registerListener(idIntsDdtl,
- Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION,
- ProduceTransactionsHandler.ID_INT_YID)),
- true, Collections.emptyList());
- } catch (DOMDataTreeLoopException e) {
- LOG.error("Failed to register DOMDataTreeListener", e);
- return RpcResultBuilder.<SubscribeDdtlOutput>failed().withError(
- ErrorType.APPLICATION, "Failed to register DOMDataTreeListener", e).buildFuture();
- }
-
- return RpcResultBuilder.success(new SubscribeDdtlOutputBuilder().build()).buildFuture();
+ throw new UnsupportedOperationException();
}
@Override
@Override
public ListenableFuture<RpcResult<CreatePrefixShardOutput>> createPrefixShard(final CreatePrefixShardInput input) {
- LOG.info("In createPrefixShard - input: {}", input);
-
- return prefixShardHandler.onCreatePrefixShard(input);
+ throw new UnsupportedOperationException();
}
@Override
@Override
public ListenableFuture<RpcResult<ProduceTransactionsOutput>> produceTransactions(
final ProduceTransactionsInput input) {
- LOG.info("In produceTransactions - input: {}", input);
- return ProduceTransactionsHandler.start(domDataTreeService, input);
+ throw new UnsupportedOperationException();
}
@Override
@Override
public ListenableFuture<RpcResult<UnregisterDefaultConstantOutput>> unregisterDefaultConstant(
final UnregisterDefaultConstantInput input) {
- return null;
+ throw new UnsupportedOperationException();
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
public ListenableFuture<RpcResult<UnsubscribeDdtlOutput>> unsubscribeDdtl(final UnsubscribeDdtlInput input) {
- LOG.info("In unsubscribeDdtl");
-
- if (idIntsDdtl == null || ddtlReg == null) {
- return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(
- ErrorType.RPC, "data-missing", "No prior listener was registered").buildFuture();
- }
-
- long timeout = 120L;
- try {
- idIntsDdtl.tryFinishProcessing().get(timeout, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- LOG.error("Unable to finish notification processing", e);
- return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
- "Unable to finish notification processing in " + timeout + " seconds", e).buildFuture();
- }
-
- ddtlReg.close();
- ddtlReg = null;
-
- if (!idIntsDdtl.hasTriggered()) {
- return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
- "No notification received.", "id-ints listener has not received any notifications").buildFuture();
- }
-
- final String shardName = ClusterUtils.getCleanShardName(ProduceTransactionsHandler.ID_INTS_YID);
- LOG.debug("Creating distributed datastore client for shard {}", shardName);
-
- final ActorUtils actorUtils = configDataStore.getActorUtils();
- final Props distributedDataStoreClientProps =
- SimpleDataStoreClientActor.props(actorUtils.getCurrentMemberName(),
- "Shard-" + shardName, actorUtils, shardName);
-
- final ActorRef clientActor = actorSystem.actorOf(distributedDataStoreClientProps);
- final DataStoreClient distributedDataStoreClient;
- try {
- distributedDataStoreClient = SimpleDataStoreClientActor
- .getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
- } catch (RuntimeException e) {
- LOG.error("Failed to get actor for {}", distributedDataStoreClientProps, e);
- clientActor.tell(PoisonPill.getInstance(), noSender());
- return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withError(ErrorType.APPLICATION, "Unable to create DataStoreClient for read", e).buildFuture();
- }
-
- final ClientLocalHistory localHistory = distributedDataStoreClient.createLocalHistory();
- final ClientTransaction tx = localHistory.createTransaction();
- final ListenableFuture<java.util.Optional<NormalizedNode<?, ?>>> read =
- tx.read(YangInstanceIdentifier.of(ProduceTransactionsHandler.ID_INT));
-
- tx.abort();
- localHistory.close();
- try {
- final java.util.Optional<NormalizedNode<?, ?>> optional = read.get();
- if (!optional.isPresent()) {
- return RpcResultBuilder.<UnsubscribeDdtlOutput>failed().withError(ErrorType.APPLICATION,
- "data-missing", "Final read from id-ints is empty").buildFuture();
- }
-
- return RpcResultBuilder.success(new UnsubscribeDdtlOutputBuilder().setCopyMatches(
- idIntsDdtl.checkEqual(optional.get()))).buildFuture();
-
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("Unable to read data to verify ddtl data", e);
- return RpcResultBuilder.<UnsubscribeDdtlOutput>failed()
- .withError(ErrorType.APPLICATION, "Final read from id-ints failed", e).buildFuture();
- } finally {
- distributedDataStoreClient.close();
- clientActor.tell(PoisonPill.getInstance(), noSender());
- }
+ throw new UnsupportedOperationException();
}
}
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.clustering.it.provider.impl;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListener;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeListeningException;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(forRemoval = true)
-public class IdIntsDOMDataTreeLIstener implements DOMDataTreeListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(IdIntsDOMDataTreeLIstener.class);
- private static final long SECOND_AS_NANO = 1000000000;
-
- private NormalizedNode<?, ?> localCopy = null;
- private final AtomicLong lastNotifTimestamp = new AtomicLong(0);
- private ScheduledFuture<?> scheduledFuture;
- private ScheduledExecutorService executorService;
-
- @Override
- public void onDataTreeChanged(final Collection<DataTreeCandidate> changes,
- final Map<DOMDataTreeIdentifier, NormalizedNode<?, ?>> subtrees) {
-
- // There should only be one candidate reported
- Preconditions.checkState(changes.size() == 1);
-
- lastNotifTimestamp.set(System.nanoTime());
-
- // do not log the change into debug, only use trace since it will lead to OOM on default heap settings
- LOG.debug("Received data tree changed");
-
- changes.forEach(change -> {
- if (change.getRootNode().getDataAfter().isPresent()) {
- LOG.trace("Received change, data before: {}, data after: {}",
- change.getRootNode().getDataBefore().isPresent()
- ? change.getRootNode().getDataBefore().get() : "",
- change.getRootNode().getDataAfter().get());
-
- if (localCopy == null || checkEqual(change.getRootNode().getDataBefore().get())) {
- localCopy = change.getRootNode().getDataAfter().get();
- } else {
- LOG.warn("Ignoring notification.");
- LOG.trace("Ignored notification content: {}", change);
- }
- } else {
- LOG.warn("getDataAfter() is missing from notification. change: {}", change);
- }
- });
- }
-
- @Override
- public void onDataTreeFailed(final Collection<DOMDataTreeListeningException> causes) {
-
- }
-
- public boolean hasTriggered() {
- return localCopy != null;
- }
-
- public Future<Void> tryFinishProcessing() {
- executorService = Executors.newSingleThreadScheduledExecutor();
- final SettableFuture<Void> settableFuture = SettableFuture.create();
-
- scheduledFuture = executorService.scheduleAtFixedRate(new CheckFinishedTask(settableFuture),
- 0, 1, TimeUnit.SECONDS);
- return settableFuture;
- }
-
- public boolean checkEqual(final NormalizedNode<?, ?> expected) {
- return localCopy.equals(expected);
- }
-
- private class CheckFinishedTask implements Runnable {
-
- private final SettableFuture<Void> future;
-
- CheckFinishedTask(final SettableFuture<Void> future) {
- this.future = future;
- }
-
- @Override
- public void run() {
- if (System.nanoTime() - lastNotifTimestamp.get() > SECOND_AS_NANO * 4) {
- scheduledFuture.cancel(false);
- future.set(null);
-
- executorService.shutdown();
- }
- }
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.clustering.it.provider.impl;
-
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.Collections;
-import java.util.concurrent.CompletionStage;
-import org.opendaylight.controller.cluster.datastore.exceptions.TimeoutException;
-import org.opendaylight.controller.cluster.dom.api.CDSDataTreeProducer;
-import org.opendaylight.controller.cluster.dom.api.CDSShardAccess;
-import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderInput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.BecomePrefixLeaderOutputBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(forRemoval = true)
-public class PrefixLeaderHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(PrefixLeaderHandler.class);
-
- private final DOMDataTreeService domDataTreeService;
- private final BindingNormalizedNodeSerializer serializer;
-
- public PrefixLeaderHandler(final DOMDataTreeService domDataTreeService,
- final BindingNormalizedNodeSerializer serializer) {
- this.domDataTreeService = domDataTreeService;
- this.serializer = serializer;
- }
-
- public ListenableFuture<RpcResult<BecomePrefixLeaderOutput>> makeLeaderLocal(final BecomePrefixLeaderInput input) {
-
- final YangInstanceIdentifier yid = serializer.toYangInstanceIdentifier(input.getPrefix());
- final DOMDataTreeIdentifier prefix = new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, yid);
-
- try (CDSDataTreeProducer producer =
- (CDSDataTreeProducer) domDataTreeService.createProducer(Collections.singleton(prefix))) {
-
- final CDSShardAccess shardAccess = producer.getShardAccess(prefix);
-
- final CompletionStage<Void> completionStage = shardAccess.makeLeaderLocal();
-
- completionStage.exceptionally(throwable -> {
- LOG.error("Leader movement failed.", throwable);
- return null;
- });
- } catch (final DOMDataTreeProducerException e) {
- LOG.warn("Error while closing producer", e);
- } catch (final TimeoutException e) {
- LOG.warn("Timeout while on producer operation", e);
- Futures.immediateFuture(RpcResultBuilder.failed().withError(RpcError.ErrorType.RPC,
- "resource-denied-transport", "Timeout while opening producer please retry.", "clustering-it",
- "clustering-it", e));
- }
-
- return Futures.immediateFuture(RpcResultBuilder.success(new BecomePrefixLeaderOutputBuilder().build()).build());
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.clustering.it.provider.impl;
-
-import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID;
-import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID_INT;
-import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ID_INTS;
-import static org.opendaylight.controller.clustering.it.provider.impl.AbstractTransactionHandler.ITEM;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletionStage;
-import java.util.stream.Collectors;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.sharding.DistributedShardFactory;
-import org.opendaylight.controller.cluster.sharding.DistributedShardRegistration;
-import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeShardingConflictException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardInput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.CreatePrefixShardOutputBuilder;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardInput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.RemovePrefixShardOutputBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.api.CollectionNodeBuilder;
-import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(forRemoval = true)
-public class PrefixShardHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(PrefixShardHandler.class);
- private static final int MAX_PREFIX = 4;
- private static final String PREFIX_TEMPLATE = "prefix-";
-
- private final DistributedShardFactory shardFactory;
- private final DOMDataTreeService domDataTreeService;
- private final BindingNormalizedNodeSerializer serializer;
-
- private final Map<YangInstanceIdentifier, DistributedShardRegistration> registrations =
- Collections.synchronizedMap(new HashMap<>());
-
- public PrefixShardHandler(final DistributedShardFactory shardFactory,
- final DOMDataTreeService domDataTreeService,
- final BindingNormalizedNodeSerializer serializer) {
-
- this.shardFactory = shardFactory;
- this.domDataTreeService = domDataTreeService;
- this.serializer = serializer;
- }
-
- public ListenableFuture<RpcResult<CreatePrefixShardOutput>> onCreatePrefixShard(
- final CreatePrefixShardInput input) {
-
- final SettableFuture<RpcResult<CreatePrefixShardOutput>> future = SettableFuture.create();
-
- final CompletionStage<DistributedShardRegistration> completionStage;
- final YangInstanceIdentifier identifier = serializer.toYangInstanceIdentifier(input.getPrefix());
-
- try {
- completionStage = shardFactory.createDistributedShard(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, identifier),
- input.getReplicas().stream().map(MemberName::forName).collect(Collectors.toList()));
-
- completionStage.thenAccept(registration -> {
- LOG.debug("Shard[{}] created successfully.", identifier);
- registrations.put(identifier, registration);
-
- final ListenableFuture<?> ensureFuture = ensureListExists();
- Futures.addCallback(ensureFuture, new FutureCallback<Object>() {
- @Override
- public void onSuccess(final Object result) {
- LOG.debug("Initial list write successful.");
- future.set(RpcResultBuilder.success(new CreatePrefixShardOutputBuilder().build()).build());
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- LOG.warn("Shard[{}] creation failed:", identifier, throwable);
-
- final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION,
- "create-shard-failed", "Shard creation failed", "cluster-test-app", "", throwable);
- future.set(RpcResultBuilder.<CreatePrefixShardOutput>failed().withRpcError(error).build());
- }
- }, MoreExecutors.directExecutor());
- });
- completionStage.exceptionally(throwable -> {
- LOG.warn("Shard[{}] creation failed:", identifier, throwable);
-
- final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed",
- "Shard creation failed", "cluster-test-app", "", throwable);
- future.set(RpcResultBuilder.<CreatePrefixShardOutput>failed().withRpcError(error).build());
- return null;
- });
- } catch (final DOMDataTreeShardingConflictException e) {
- LOG.warn("Unable to register shard for: {}.", identifier);
-
- final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "create-shard-failed",
- "Sharding conflict", "cluster-test-app", "", e);
- future.set(RpcResultBuilder.<CreatePrefixShardOutput>failed().withRpcError(error).build());
- }
-
- return future;
- }
-
- public ListenableFuture<RpcResult<RemovePrefixShardOutput>> onRemovePrefixShard(
- final RemovePrefixShardInput input) {
-
- final YangInstanceIdentifier identifier = serializer.toYangInstanceIdentifier(input.getPrefix());
- final DistributedShardRegistration registration = registrations.get(identifier);
-
- if (registration == null) {
- final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "registration-missing",
- "No shard registered at this prefix.");
- return Futures.immediateFuture(RpcResultBuilder.<RemovePrefixShardOutput>failed().withRpcError(error)
- .build());
- }
-
- final SettableFuture<RpcResult<RemovePrefixShardOutput>> future = SettableFuture.create();
-
- final CompletionStage<Void> close = registration.close();
- close.thenRun(() -> future.set(RpcResultBuilder.success(new RemovePrefixShardOutputBuilder().build()).build()));
- close.exceptionally(throwable -> {
- LOG.warn("Shard[{}] removal failed:", identifier, throwable);
-
- final RpcError error = RpcResultBuilder.newError(RpcError.ErrorType.APPLICATION, "remove-shard-failed",
- "Shard removal failed", "cluster-test-app", "", throwable);
- future.set(RpcResultBuilder.<RemovePrefixShardOutput>failed().withRpcError(error).build());
- return null;
- });
-
- return future;
- }
-
- private ListenableFuture<?> ensureListExists() {
-
- final CollectionNodeBuilder<MapEntryNode, MapNode> mapBuilder = ImmutableNodes.mapNodeBuilder(ID_INT);
-
- // hardcoded initial list population for parallel produce-transactions testing on multiple nodes
- for (int i = 1; i < MAX_PREFIX; i++) {
- mapBuilder.withChild(
- ImmutableNodes.mapEntryBuilder(ID_INT, ID, PREFIX_TEMPLATE + i)
- .withChild(ImmutableNodes.mapNodeBuilder(ITEM).build())
- .build());
- }
- final MapNode mapNode = mapBuilder.build();
-
- final ContainerNode containerNode = ImmutableContainerNodeBuilder.create()
- .withNodeIdentifier(new YangInstanceIdentifier.NodeIdentifier(ID_INTS))
- .withChild(mapNode)
- .build();
-
- final DOMDataTreeProducer producer = domDataTreeService.createProducer(Collections.singleton(
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty())));
-
- final DOMDataTreeCursorAwareTransaction tx = producer.createTransaction(false);
-
- final DOMDataTreeWriteCursor cursor =
- tx.createCursor(new DOMDataTreeIdentifier(
- LogicalDatastoreType.CONFIGURATION, YangInstanceIdentifier.empty()));
-
- cursor.merge(containerNode.getIdentifier(), containerNode);
- cursor.close();
-
- final ListenableFuture<?> future = tx.commit();
- Futures.addCallback(future, new FutureCallback<Object>() {
- @Override
- public void onSuccess(final Object result) {
- try {
- LOG.debug("Closing producer for initial list.");
- producer.close();
- } catch (DOMDataTreeProducerException e) {
- LOG.warn("Error while closing producer.", e);
- }
- }
-
- @Override
- public void onFailure(final Throwable throwable) {
- //NOOP handled by the caller of this method.
- }
- }, MoreExecutors.directExecutor());
- return future;
- }
-}
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.clustering.it.provider.impl;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.util.concurrent.FluentFuture;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.SplittableRandom;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.mdsal.common.api.CommitInfo;
-import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCursorAwareTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducer;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeProducerException;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeService;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeWriteCursor;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsInput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutput;
-import org.opendaylight.yang.gen.v1.tag.opendaylight.org._2017.controller.yang.lowlevel.control.rev170215.ProduceTransactionsOutputBuilder;
-import org.opendaylight.yangtools.yang.common.RpcError;
-import org.opendaylight.yangtools.yang.common.RpcResult;
-import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
-import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
-import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated(forRemoval = true)
-public final class ProduceTransactionsHandler extends AbstractTransactionHandler {
- private static final Logger LOG = LoggerFactory.getLogger(ProduceTransactionsHandler.class);
-
- private final SettableFuture<RpcResult<ProduceTransactionsOutput>> future = SettableFuture.create();
- private final SplittableRandom random = new SplittableRandom();
- private final Set<Integer> usedValues = new HashSet<>();
- private final DOMDataTreeIdentifier idListItem;
- private final DOMDataTreeProducer itemProducer;
-
- private long insertTx = 0;
- private long deleteTx = 0;
-
- private ProduceTransactionsHandler(final DOMDataTreeProducer producer, final DOMDataTreeIdentifier idListItem,
- final ProduceTransactionsInput input) {
- super(input);
- this.itemProducer = requireNonNull(producer);
- this.idListItem = requireNonNull(idListItem);
- }
-
- public static ListenableFuture<RpcResult<ProduceTransactionsOutput>> start(
- final DOMDataTreeService domDataTreeService, final ProduceTransactionsInput input) {
- final String id = input.getId();
- LOG.debug("Filling the item list {} with initial values.", id);
-
- final YangInstanceIdentifier idListWithKey = ID_INT_YID.node(NodeIdentifierWithPredicates.of(ID_INT, ID, id));
-
- final DOMDataTreeProducer itemProducer = domDataTreeService.createProducer(
- Collections.singleton(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey)));
-
- final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
- final DOMDataTreeWriteCursor cursor =
- tx.createCursor(new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey));
-
- final MapNode list = ImmutableNodes.mapNodeBuilder(ITEM).build();
- cursor.write(list.getIdentifier(), list);
- cursor.close();
-
- try {
- tx.commit().get(INIT_TX_TIMEOUT_SECONDS, TimeUnit.SECONDS);
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- LOG.warn("Unable to fill the initial item list.", e);
- closeProducer(itemProducer);
-
- return Futures.immediateFuture(RpcResultBuilder.<ProduceTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Unexpected-exception", e).build());
- }
-
- final ProduceTransactionsHandler handler = new ProduceTransactionsHandler(itemProducer,
- new DOMDataTreeIdentifier(LogicalDatastoreType.CONFIGURATION, idListWithKey.node(list.getIdentifier())
- .toOptimized()), input);
- // It is handler's responsibility to close itemProducer when the work is finished.
- handler.doStart();
- return handler.future;
- }
-
- private static void closeProducer(final DOMDataTreeProducer producer) {
- try {
- producer.close();
- } catch (final DOMDataTreeProducerException exception) {
- LOG.warn("Failure while closing producer.", exception);
- }
- }
-
- @Override
- FluentFuture<? extends @NonNull CommitInfo> execWrite(final long txId) {
- final int i = random.nextInt(MAX_ITEM + 1);
- final DOMDataTreeCursorAwareTransaction tx = itemProducer.createTransaction(false);
- final DOMDataTreeWriteCursor cursor = tx.createCursor(idListItem);
-
- final NodeIdentifierWithPredicates entryId = NodeIdentifierWithPredicates.of(ITEM, NUMBER, i);
- if (usedValues.contains(i)) {
- LOG.debug("Deleting item: {}", i);
- deleteTx++;
- cursor.delete(entryId);
- usedValues.remove(i);
-
- } else {
- LOG.debug("Inserting item: {}", i);
- insertTx++;
-
- final MapEntryNode entry = ImmutableNodes.mapEntryBuilder().withNodeIdentifier(entryId)
- .withChild(ImmutableNodes.leafNode(NUMBER, i)).build();
- cursor.write(entryId, entry);
- usedValues.add(i);
- }
-
- cursor.close();
-
- return tx.commit();
- }
-
- @Override
- void runFailed(final Throwable cause, final long txId) {
- closeProducer(itemProducer);
- future.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, "Commit failed for tx # " + txId, cause).build());
- }
-
- @Override
- void runSuccessful(final long allTx) {
- closeProducer(itemProducer);
- final ProduceTransactionsOutput output = new ProduceTransactionsOutputBuilder()
- .setAllTx(allTx)
- .setInsertTx(insertTx)
- .setDeleteTx(deleteTx)
- .build();
- future.set(RpcResultBuilder.<ProduceTransactionsOutput>success()
- .withResult(output).build());
- }
-
- @Override
- void runTimedOut(final String cause) {
- closeProducer(itemProducer);
- future.set(RpcResultBuilder.<ProduceTransactionsOutput>failed()
- .withError(RpcError.ErrorType.APPLICATION, cause).build());
- }
-}
<reference id="normalizedNodeSerializer" interface="org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer"/>
<reference id="notificationPublishService" interface="org.opendaylight.mdsal.binding.api.NotificationPublishService" />
<reference id="notificationListenerService" interface="org.opendaylight.mdsal.binding.api.NotificationService" />
- <reference id="domDataTreeService" interface="org.opendaylight.mdsal.dom.api.DOMDataTreeService"/>
- <reference id="distributedShardFactory" interface="org.opendaylight.controller.cluster.sharding.DistributedShardFactory"/>
<reference id="configDatastore" interface="org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface"
odl:type="distributed-config"/>
<reference id="actorSystemProvider" interface="org.opendaylight.controller.cluster.ActorSystemProvider"/>
<argument ref="notificationPublishService"/>
<argument ref="notificationListenerService"/>
<argument ref="domDataBroker"/>
- <argument ref="domDataTreeService"/>
- <argument ref="distributedShardFactory"/>
<argument ref="configDatastore"/>
<argument ref="actorSystemProvider"/>
</bean>