// 2 ServerConfigurationPayload entries, the transaction payload entry plus a purge payload.
RaftStateVerifier verifier = raftState -> {
- assertEquals("Commit index", 3, raftState.getCommitIndex());
- assertEquals("Last applied index", 3, raftState.getLastApplied());
+ assertEquals("Commit index", 5, raftState.getCommitIndex());
+ assertEquals("Last applied index", 5, raftState.getLastApplied());
};
verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
final FrontendIdentifier frontendId = clientId.getFrontendId();
final FrontendClientMetadataBuilder client = clients.get(frontendId);
if (client == null) {
- LOG.debug("{}: disableTracking {} does not match any client, ignoring", shardName, clientId);
+ // When we havent seen the client before, we still need to disable tracking for him since this only gets
+ // triggered once.
+ LOG.debug("{}: disableTracking {} does not match any client, pre-disabling client.", shardName, clientId);
+ clients.put(frontendId, new FrontendClientMetadataBuilder.Disabled(shardName, clientId));
return;
}
if (!clientId.equals(client.getIdentifier())) {
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload;
-import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.controller.cluster.messaging.MessageSlicer;
import org.opendaylight.controller.cluster.messaging.SliceOptions;
} else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
store.processCohortRegistryCommand(getSender(),
(DataTreeCohortActorRegistry.CohortRegistryCommand) message);
- } else if (message instanceof PersistAbortTransactionPayload) {
- final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
- persistPayload(txId, AbortTransactionPayload.create(txId,
- datastoreContext.getInitialPayloadSerializedBufferCapacity()), true);
- persistPayload(txId, PurgeTransactionPayload.create(txId,
- datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
} else if (message instanceof MakeLeaderLocal) {
onMakeLeaderLocal();
} else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
}
} else {
LOG.debug("{}: leader state for {} not found", persistenceId(), clientId);
+ knownFrontends.put(frontendId, new LeaderFrontendState.Disabled(persistenceId(), clientId,
+ getDataStore()));
}
}
}
// Called on leader only
private void askProtocolEncountered(final ClientIdentifier clientId) {
- final LeaderFrontendState state = knownFrontends.get(clientId.getFrontendId());
- if (state instanceof LeaderFrontendState.Enabled) {
+ final FrontendIdentifier frontend = clientId.getFrontendId();
+ final LeaderFrontendState state = knownFrontends.get(frontend);
+ if (!(state instanceof LeaderFrontendState.Disabled)) {
LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId);
+ if (knownFrontends.isEmpty()) {
+ knownFrontends = new HashMap<>();
+ }
+ knownFrontends.put(frontend, new LeaderFrontendState.Disabled(persistenceId(), clientId, getDataStore()));
+
persistPayload(clientId, DisableTrackingPayload.create(clientId,
datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
}
paused = true;
// Tell-based protocol can replay transaction state, so it is safe to blow it up when we are paused.
- knownFrontends.values().forEach(LeaderFrontendState::retire);
- knownFrontends = ImmutableMap.of();
+ if (datastoreContext.isUseTellBasedProtocol()) {
+ knownFrontends.values().forEach(LeaderFrontendState::retire);
+ knownFrontends = ImmutableMap.of();
+ }
store.setRunOnPendingTransactionsComplete(operation);
}
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.mdsal.common.api.ReadFailedException;
private void closeTransaction(final boolean sendReply) {
getDOMStoreTransaction().abortFromTransactionActor();
- shardActor.tell(new PersistAbortTransactionPayload(transactionId), ActorRef.noSender());
if (sendReply && returnCloseTransactionReply()) {
getSender().tell(new CloseTransactionReply(), getSelf());
+++ /dev/null
-/*
- * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore.messages;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-
-/**
- * A request sent from {@link org.opendaylight.controller.cluster.datastore.ShardTransaction} to
- * {@link org.opendaylight.controller.cluster.datastore.Shard} to persist an
- * {@link org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload} after the transaction has
- * been closed by the frontend and internal backend state has been updated.
- *
- * <p>
- * Since the two are actors, we cannot perform a direct upcall, as that breaks actor containment and wreaks havoc into
- * Akka itself. This class does not need to be serializable, as both actors are guaranteed to be co-located.
- *
- * @author Robert Varga
- */
-public final class PersistAbortTransactionPayload {
- private final TransactionIdentifier txId;
-
- public PersistAbortTransactionPayload(final TransactionIdentifier txId) {
- this.txId = Preconditions.checkNotNull(txId);
- }
-
- public TransactionIdentifier getTransactionId() {
- return txId;
- }
-}
// Wait for the commit to be replicated to the follower.
MemberNode.verifyRaftState(followerDistributedDataStore, "cars",
- raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
+ raftState -> assertEquals("getLastApplied", 2, raftState.getLastApplied()));
MemberNode.verifyRaftState(followerDistributedDataStore, "people",
- raftState -> assertEquals("getLastApplied", 1, raftState.getLastApplied()));
+ raftState -> assertEquals("getLastApplied", 2, raftState.getLastApplied()));
// Prepare, ready and canCommit a WO tx that writes to 2 shards. This will become the current tx in
// the leader shard.
verifyOuterListEntry(shard, 1);
- verifyLastApplied(shard, 5);
+ verifyLastApplied(shard, 6);
}
@Test
// Committed transaction count should increase as usual
assertEquals(1, shardStats.getCommittedTransactionsCount());
- // Commit index should advance as we do not have an empty
- // modification
- assertEquals(1, shardStats.getCommitIndex());
+ // Commit index should advance 2 to account for disabling metadata
+ assertEquals(2, shardStats.getCommitIndex());
}
@Test