import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.VotingState;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.CheckConsensusReached;
import org.opendaylight.controller.cluster.raft.base.messages.Replicate;
import org.opendaylight.controller.cluster.raft.base.messages.SendHeartBeat;
import org.opendaylight.controller.cluster.raft.messages.UnInitializedFollowerSnapshotReply;
import org.opendaylight.controller.cluster.raft.persisted.ServerConfigurationPayload;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import scala.concurrent.duration.FiniteDuration;
/**
super.performSnapshotWithoutCapture(minReplicatedToAllIndex);
}
- @Override
- protected ClientRequestTracker removeClientRequestTracker(final long logIndex) {
+ /**
+ * Removes and returns the ClientRequestTracker for the specified log index.
+ * @param logIndex the log index
+ * @return the ClientRequestTracker or null if none available
+ */
+ private ClientRequestTracker removeClientRequestTracker(final long logIndex) {
final Iterator<ClientRequestTracker> it = trackers.iterator();
while (it.hasNext()) {
final ClientRequestTracker t = it.next();
return null;
}
+ @Override
+ final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+ // first check whether a ClientRequestTracker exists for this entry.
+ // If it does that means the leader wasn't dropped before the transaction applied.
+ // That means that this transaction can be safely applied as a local transaction since we
+ // have the ClientRequestTracker.
+ final ClientRequestTracker tracker = removeClientRequestTracker(entry.getIndex());
+ if (tracker != null) {
+ return new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), entry);
+ }
+
+ // Tracker is missing, this means that we switched behaviours between replicate and applystate
+ // and became the leader again,. We still want to apply this as a local modification because
+ // we have resumed leadership with that log entry having been committed.
+ final Payload payload = entry.getData();
+ if (payload instanceof IdentifiablePayload) {
+ return new ApplyState(null, ((IdentifiablePayload<?>) payload).getIdentifier(), entry);
+ }
+
+ return new ApplyState(null, null, entry);
+ }
+
@Override
protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
return this;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.opendaylight.controller.cluster.raft.ClientRequestTracker;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
return context.getReplicatedLog().lastIndex();
}
- /**
- * Removes and returns the ClientRequestTracker for the specified log index.
- * @param logIndex the log index
- * @return the ClientRequestTracker or null if none available
- */
- protected ClientRequestTracker removeClientRequestTracker(final long logIndex) {
- return null;
- }
-
/**
* Returns the actual index of the entry in replicated log for the given index or -1 if not found.
*
// Send a local message to the local RaftActor (it's derived class to be
// specific to apply the log to it's index)
- final ApplyState applyState;
- final ClientRequestTracker tracker = removeClientRequestTracker(i);
- if (tracker != null) {
- applyState = new ApplyState(tracker.getClientActor(), tracker.getIdentifier(), replicatedLogEntry);
- } else {
- applyState = new ApplyState(null, null, replicatedLogEntry);
- }
+ final ApplyState applyState = getApplyStateFor(replicatedLogEntry);
log.debug("{}: Setting last applied to {}", logName(), i);
actor().tell(new ApplyJournalEntries(context.getLastApplied()), actor());
}
+ /**
+ * Create an ApplyState message for a particular log entry so we can determine how to apply this entry.
+ *
+ * @param entry the log entry
+ * @return ApplyState for this entry
+ */
+ abstract ApplyState getApplyStateFor(ReplicatedLogEntry entry);
+
@Override
public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
if (message instanceof AppendEntries) {
import org.opendaylight.controller.cluster.raft.PeerInfo;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.opendaylight.controller.cluster.raft.RaftState;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
private final Collection<String> votingPeers = new ArrayList<>();
- public Candidate(RaftActorContext context) {
+ public Candidate(final RaftActorContext context) {
super(context, RaftState.Candidate);
for (PeerInfo peer: context.getPeers()) {
}
@Override
- protected RaftActorBehavior handleAppendEntries(ActorRef sender, AppendEntries appendEntries) {
+ protected RaftActorBehavior handleAppendEntries(final ActorRef sender, final AppendEntries appendEntries) {
log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
}
@Override
- protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
+ protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
+ final AppendEntriesReply appendEntriesReply) {
return this;
}
@Override
- protected RaftActorBehavior handleRequestVoteReply(ActorRef sender, RequestVoteReply requestVoteReply) {
+ protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender, final RequestVoteReply requestVoteReply) {
log.debug("{}: handleRequestVoteReply: {}, current voteCount: {}", logName(), requestVoteReply, voteCount);
if (requestVoteReply.isVoteGranted()) {
return super.electionDuration().$div(context.getConfigParams().getCandidateElectionTimeoutDivisor());
}
+
+ @Override
+ final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+ throw new IllegalStateException("A candidate should never attempt to apply " + entry);
+ }
+
@Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
if (message instanceof ElectionTimeout) {
log.debug("{}: Received ElectionTimeout", logName());
return super.handleMessage(sender, message);
}
-
private void startNewTerm() {
-
-
// set voteCount back to 1 (that is voting for self)
voteCount = 1;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
+import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
return this;
}
+ @Override
+ final ApplyState getApplyStateFor(final ReplicatedLogEntry entry) {
+ return new ApplyState(null, null, entry);
+ }
+
@Override
public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
if (message instanceof ElectionTimeout || message instanceof TimeoutNow) {
--- /dev/null
+/*
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.raft.protobuff.client.messages;
+
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+public abstract class IdentifiablePayload<T extends Identifier> extends Payload implements Identifiable<T> {
+}
/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2020 PANTHEON.tech, s.r.o. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
@SuppressWarnings("checkstyle:IllegalCatch")
private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
- final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.getCandidate();
+ final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
final DataTreeModification unwrapped = dataTree.takeSnapshot().newModification();
final PruningDataTreeModification mod = createPruningModification(unwrapped,
NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().getVersion()) > 0);
private void applyReplicatedCandidate(final CommitTransactionPayload payload)
throws DataValidationFailedException, IOException {
- final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.getCandidate();
+ final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
final TransactionIdentifier identifier = entry.getKey();
LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
applyReplicatedCandidate((CommitTransactionPayload) payload);
} else {
verify(identifier instanceof TransactionIdentifier);
- payloadReplicationComplete((TransactionIdentifier) identifier);
+ // if we did not track this transaction before, it means that it came from another leader and we are in
+ // the process of commiting it while in PreLeader state. That means that it hasnt yet been committed to
+ // the local DataTree and would be lost if it was only applied via payloadReplicationComplete().
+ if (!payloadReplicationComplete((TransactionIdentifier) identifier)) {
+ applyReplicatedCandidate((CommitTransactionPayload) payload);
+ }
}
} else if (payload instanceof AbortTransactionPayload) {
if (identifier != null) {
}
}
- private void payloadReplicationComplete(final TransactionIdentifier txId) {
+ private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
final CommitEntry current = pendingFinishCommits.peek();
if (current == null) {
LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
allMetadataCommittedTransaction(txId);
- return;
+ return false;
}
if (!current.cohort.getIdentifier().equals(txId)) {
LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
current.cohort.getIdentifier(), txId);
allMetadataCommittedTransaction(txId);
- return;
+ return false;
}
finishCommit(current.cohort);
+ return true;
}
private void allMetadataAbortedTransaction(final TransactionIdentifier txId) {
import java.io.ObjectOutput;
import java.io.Serializable;
import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.concepts.Identifier;
*
* @author Robert Varga
*/
-public abstract class AbstractIdentifiablePayload<T extends Identifier>
- extends Payload implements Identifiable<T>, Serializable {
+public abstract class AbstractIdentifiablePayload<T extends Identifier> extends IdentifiablePayload<T>
+ implements Serializable {
protected abstract static class AbstractProxy<T extends Identifier> implements Externalizable {
private static final long serialVersionUID = 1L;
private byte[] serialized;
import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.IdentifiablePayload;
import org.opendaylight.yangtools.concepts.Variant;
import org.opendaylight.yangtools.yang.data.api.schema.stream.ReusableStreamReceiver;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
* @author Robert Varga
*/
@Beta
-public abstract class CommitTransactionPayload extends Payload implements Serializable {
+public abstract class CommitTransactionPayload extends IdentifiablePayload<TransactionIdentifier>
+ implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(CommitTransactionPayload.class);
private static final long serialVersionUID = 1L;
+ private volatile Entry<TransactionIdentifier, DataTreeCandidateWithVersion> candidate = null;
+
CommitTransactionPayload() {
}
}
public @NonNull Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate() throws IOException {
- return getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
+ Entry<TransactionIdentifier, DataTreeCandidateWithVersion> localCandidate = candidate;
+ if (localCandidate == null) {
+ synchronized (this) {
+ localCandidate = candidate;
+ if (localCandidate == null) {
+ candidate = localCandidate = getCandidate(ReusableImmutableNormalizedNodeStreamWriter.create());
+ }
+ }
+ }
+ return localCandidate;
}
public final @NonNull Entry<TransactionIdentifier, DataTreeCandidateWithVersion> getCandidate(
DataTreeCandidateInputOutput.readDataTreeCandidate(in, receiver));
}
+ @Override
+ public TransactionIdentifier getIdentifier() {
+ try {
+ return getCandidate().getKey();
+ } catch (IOException e) {
+ throw new IllegalStateException("Candidate deserialization failed.", e);
+ }
+ }
+
+ /**
+ * The cached candidate needs to be cleared after it is done applying to the DataTree, otherwise it would be keeping
+ * deserialized in memory which are not needed anymore leading to wasted memory. This lets the payload know that
+ * this was the last time the candidate was needed ant it is safe to be cleared.
+ */
+ public Entry<TransactionIdentifier, DataTreeCandidateWithVersion> acquireCandidate() throws IOException {
+ final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> localCandidate = getCandidate();
+ candidate = null;
+ return localCandidate;
+ }
+
abstract void writeBytes(ObjectOutput out) throws IOException;
abstract DataInput newDataInput();
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-final class ShardInformation {
+@VisibleForTesting
+public final class ShardInformation {
private static final Logger LOG = LoggerFactory.getLogger(ShardInformation.class);
private final Set<OnShardInitialized> onShardInitializedSet = new HashSet<>();
return shardName;
}
- @Nullable ActorRef getActor() {
+ @VisibleForTesting
+ @Nullable public ActorRef getActor() {
return actor;
}
configUpdateHandler.initListener(dataStore, datastoreType);
}
- private void onShutDown() {
+ void onShutDown() {
List<Future<Boolean>> stopFutures = new ArrayList<>(localShards.size());
for (ShardInformation info : localShards.values()) {
if (info.getActor() != null) {
import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.LocalShardStore;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.shardmanager.AbstractShardManagerCreator;
import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager;
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShards;
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-public class TestClientBackedDataStore extends ClientBackedDataStore {
+public class TestClientBackedDataStore extends ClientBackedDataStore implements LocalShardStore {
+
public TestClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
final Configuration configuration,
final DatastoreContextFactory datastoreContextFactory,
protected AbstractShardManagerCreator<?> getShardManagerCreator() {
return new TestShardManager.TestShardManagerCreator();
}
+
+ @Override
+ public GetLocalShardsReply getLocalShards() {
+ final ActorUtils utils = getActorUtils();
+ return (GetLocalShardsReply) utils.executeOperation(utils.getShardManager(), GetLocalShards.INSTANCE);
+ }
}
package org.opendaylight.controller.cluster.datastore;
import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.junit.After;
import org.opendaylight.controller.cluster.databroker.TestClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
+import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
+import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.exceptions.ShardLeaderNotRespondingException;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.Shutdown;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
import org.opendaylight.controller.cluster.raft.messages.RequestVote;
import org.opendaylight.controller.cluster.raft.persisted.ApplyJournalEntries;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
verifyNode(rwTx, CarsModel.BASE_PATH, carsNode);
}
+ @SuppressWarnings("IllegalCatch")
+ @Test
+ public void testRaftCallbackDuringLeadershipDrop() throws Exception {
+ final String testName = "testRaftCallbackDuringLeadershipDrop";
+ initDatastores(testName, MODULE_SHARDS_CARS_1_2_3, CARS);
+
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ final IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System,
+ DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build()).operationTimeoutInMillis(500)
+ .shardLeaderElectionTimeoutInSeconds(3600),
+ commitTimeout);
+
+ final DOMStoreWriteTransaction initialWriteTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+ initialWriteTx.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
+ leaderTestKit.doCommit(initialWriteTx.ready());
+
+ try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupAbstractDataStore(
+ testParameter, testName, MODULE_SHARDS_CARS_1_2_3, false)) {
+
+ final ActorRef member3Cars = ((LocalShardStore) follower2DistributedDataStore).getLocalShards()
+ .getLocalShards().get("cars").getActor();
+ final ActorRef member2Cars = ((LocalShardStore)followerDistributedDataStore).getLocalShards()
+ .getLocalShards().get("cars").getActor();
+ member2Cars.tell(new StartDropMessages(AppendEntries.class), null);
+ member3Cars.tell(new StartDropMessages(AppendEntries.class), null);
+
+ final DOMStoreWriteTransaction newTx = leaderDistributedDataStore.newWriteOnlyTransaction();
+ newTx.write(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
+ final AtomicBoolean submitDone = new AtomicBoolean(false);
+ executor.submit(() -> {
+ try {
+ leaderTestKit.doCommit(newTx.ready());
+ submitDone.set(true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ final ActorRef leaderCars = ((LocalShardStore) leaderDistributedDataStore).getLocalShards()
+ .getLocalShards().get("cars").getActor();
+ await().atMost(10, TimeUnit.SECONDS)
+ .until(() -> ((OnDemandRaftState) leaderDistributedDataStore.getActorUtils()
+ .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE)).getLastIndex() >= 1);
+
+ final OnDemandRaftState raftState = (OnDemandRaftState)leaderDistributedDataStore.getActorUtils()
+ .executeOperation(leaderCars, GetOnDemandRaftState.INSTANCE);
+
+ // Simulate a follower not receiving heartbeats but still being able to send messages ie RequestVote with
+ // new term(switching to candidate after election timeout)
+ leaderCars.tell(new RequestVote(raftState.getCurrentTerm() + 1,
+ "member-3-shard-cars-testRaftCallbackDuringLeadershipDrop", -1,
+ -1), member3Cars);
+
+ member2Cars.tell(new StopDropMessages(AppendEntries.class), null);
+ member3Cars.tell(new StopDropMessages(AppendEntries.class), null);
+
+ await("Is tx stuck in COMMIT_PENDING")
+ .atMost(10, TimeUnit.SECONDS).untilAtomic(submitDone, equalTo(true));
+
+ }
+
+ executor.shutdownNow();
+ }
+
private static void verifySnapshot(final Snapshot actual, final Snapshot expected,
final NormalizedNode<?, ?> expRoot) {
assertEquals("Snapshot getLastAppliedTerm", expected.getLastAppliedTerm(), actual.getLastAppliedTerm());
--- /dev/null
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager.GetLocalShardsReply;
+
+public interface LocalShardStore {
+
+ GetLocalShardsReply getLocalShards();
+}
import org.opendaylight.controller.cluster.datastore.shardmanager.TestShardManager;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
-public class TestDistributedDataStore extends DistributedDataStore {
+public class TestDistributedDataStore extends DistributedDataStore implements LocalShardStore {
public TestDistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
final Configuration configuration,
protected AbstractShardManagerCreator<?> getShardManagerCreator() {
return new TestShardManager.TestShardManagerCreator();
}
+
+ @Override
+ public TestShardManager.GetLocalShardsReply getLocalShards() {
+ TestShardManager.GetLocalShardsReply reply =
+ (TestShardManager.GetLocalShardsReply) getActorUtils()
+ .executeOperation(getActorUtils().getShardManager(), TestShardManager.GetLocalShards.INSTANCE);
+
+ return reply;
+ }
}
*/
package org.opendaylight.controller.cluster.datastore;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Predicate;
import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
public class TestShard extends Shard {
+ public static class Builder extends Shard.Builder {
+ Builder() {
+ super(TestShard.class);
+ }
+ }
+
// Message to request FrontendMetadata
public static final class RequestFrontendMetadata {
}
+ private abstract static class DropMessages<T> {
+ private final Class<T> msgClass;
+
+ DropMessages(final Class<T> msgClass) {
+ this.msgClass = requireNonNull(msgClass);
+ }
+
+ final Class<T> getMsgClass() {
+ return msgClass;
+ }
+ }
+
+ public static class StartDropMessages<T> extends DropMessages<T> {
+ public StartDropMessages(final Class<T> msgClass) {
+ super(msgClass);
+ }
+ }
+
+ public static class StopDropMessages<T> extends DropMessages<T> {
+ public StopDropMessages(final Class<T> msgClass) {
+ super(msgClass);
+ }
+ }
+
+ private final Map<Class<?>, Predicate<?>> dropMessages = new ConcurrentHashMap<>();
+
protected TestShard(AbstractBuilder<?, ?> builder) {
super(builder);
}
}
}
- public static Shard.Builder builder() {
- return new TestShard.Builder();
+ @Override
+ protected void handleCommand(Object message) {
+ if (message instanceof StartDropMessages) {
+ startDropMessages(((StartDropMessages<?>) message).getMsgClass());
+ } else if (message instanceof StopDropMessages) {
+ stopDropMessages(((StopDropMessages<?>) message).getMsgClass());
+ } else {
+ dropOrHandle(message);
+ }
}
- public static class Builder extends Shard.Builder {
- Builder() {
- super(TestShard.class);
+ private <T> void dropOrHandle(T message) {
+ Predicate<T> drop = (Predicate<T>) dropMessages.get(message.getClass());
+ if (drop == null || !drop.test(message)) {
+ super.handleCommand(message);
}
}
-}
+ private void startDropMessages(final Class<?> msgClass) {
+ dropMessages.put(msgClass, msg -> true);
+ }
+
+ <T> void startDropMessages(final Class<T> msgClass, final Predicate<T> filter) {
+ dropMessages.put(msgClass, filter);
+ }
+
+ public void stopDropMessages(final Class<?> msgClass) {
+ dropMessages.remove(msgClass);
+ }
+
+ public static TestShard.Builder builder() {
+ return new TestShard.Builder();
+ }
+}
super(builder);
}
+ @Override
+ public void handleCommand(Object message) throws Exception {
+ if (GetLocalShards.INSTANCE.equals(message)) {
+ sender().tell(new GetLocalShardsReply(localShards), null);
+ } else {
+ super.handleCommand(message);
+ }
+ }
+
/**
* Plug into shard actor creation to replace info with our testing one.
* @param info shard info.
*/
@Override
protected ActorRef newShardActor(ShardInformation info) {
+ Map<String, String> peerAddresses = getPeerAddresses(info.getShardName());
ShardInformation newInfo = new ShardInformation(info.getShardName(),
- info.getShardId(), getPeerAddresses(info.getShardName()),
+ info.getShardId(), peerAddresses,
info.getDatastoreContext(),
- TestShard.builder().restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()),
+ TestShard.builder()
+ .restoreFromSnapshot(info.getBuilder().getRestoreFromSnapshot()),
peerAddressResolver);
newInfo.setSchemaContext(info.getSchemaContext());
newInfo.setActiveMember(info.isActiveMember());
return Props.create(TestShardManager.class, this);
}
}
+
+ public static final class GetLocalShards {
+ public static final GetLocalShards INSTANCE = new GetLocalShards();
+
+ private GetLocalShards() {
+
+ }
+ }
+
+ public static class GetLocalShardsReply {
+
+ private final Map<String, ShardInformation> localShards;
+
+ public GetLocalShardsReply(Map<String, ShardInformation> localShards) {
+ this.localShards = localShards;
+ }
+
+ public Map<String, ShardInformation> getLocalShards() {
+ return localShards;
+ }
+ }
}